Re-implement txt functionality

This commit is contained in:
Dylan 2024-06-22 20:10:31 +01:00
parent ee5c93fe6d
commit 165ae08300
8 changed files with 124 additions and 20 deletions

5
api.md
View File

@ -46,5 +46,6 @@ The following fields are returned:
}
```
Additionally, you can add the `include_txt` parameter. Setting it to 'true' will include an additional media URL, a link to a .txt containing basic tweet information.
`include_txt` can also be set to `ifnomedia`, which will only add a link to the .txt if there is no media URLs in the tweet.
Additionally, you can add the `include_rtf` or `include_txt` parameters. Setting any to 'true' will include an additional media URL, a link to a .txt or .rtf containing basic tweet information.
Instead of "true", they can also be set to `ifnomedia`, which will only add a link if there is no media URLs in the tweet.
These features are added for archival of text tweets in certain software.

View File

@ -7,4 +7,5 @@ Flask-Cors==4.0.0
yt-dlp==2022.7.18
Werkzeug==2.3.7
numerize==0.12
oauthlib==3.2.2
oauthlib==3.2.2
PyRTF3==0.47.5

38
test_api.py Normal file
View File

@ -0,0 +1,38 @@
from vx_testdata import *
import twitfix,twExtract
from flask.testing import FlaskClient
client = FlaskClient(twitfix.app)
def test_api_include_txt():
resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=true",headers={"User-Agent":"test"})
jData = resp.get_json()
assert resp.status_code==200
assert any(".txt" in i for i in jData["mediaURLs"])
def test_api_include_rtf():
resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=true",headers={"User-Agent":"test"})
jData = resp.get_json()
assert resp.status_code==200
assert any(".rtf" in i for i in jData["mediaURLs"])
def test_api_include_txt_nomedia():
resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=ifnomedia",headers={"User-Agent":"test"})
jData = resp.get_json()
assert resp.status_code==200
assert any(".txt" in i for i in jData["mediaURLs"])
resp = client.get(testMediaTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=ifnomedia",headers={"User-Agent":"test"})
jData = resp.get_json()
assert resp.status_code==200
assert not any(".txt" in i for i in jData["mediaURLs"])
def test_api_include_rtf_nomedia():
resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=ifnomedia",headers={"User-Agent":"test"})
jData = resp.get_json()
assert resp.status_code==200
assert any(".rtf" in i for i in jData["mediaURLs"])
resp = client.get(testMediaTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=ifnomedia",headers={"User-Agent":"test"})
jData = resp.get_json()
assert resp.status_code==200
assert not any(".rtf" in i for i in jData["mediaURLs"])

View File

@ -173,4 +173,14 @@ def test_embed_stripLastUrl():
def test_embed_no_username():
resp = client.get(testMediaTweet.replace("/pdxdylan",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert resp.status_code==200
def test_embed_txt():
resp = client.get(testTextTweet.replace("https://twitter.com","")+".txt",headers={"User-Agent":"test"})
assert resp.status_code==200
assert testTextTweet_compare["text"] in str(resp.data)
def test_embed_rtf():
resp = client.get(testTextTweet.replace("https://twitter.com","")+".rtf",headers={"User-Agent":"test"})
assert resp.status_code==200
assert testTextTweet_compare["text"] in str(resp.data)

View File

@ -3,10 +3,13 @@ import vx_testdata
tokens = ""
tokensList = tokens.split(",")
errorTokens = []
for token in tokensList:
try:
twExtract.extractStatusV2(vx_testdata.testNSFWTweet,workaroundTokens=[token])
except Exception as e:
print(str(e)+" "+token)
pass
errorTokens.append(token)
pass
print("Error tokens: "+str(errorTokens))

View File

@ -4,7 +4,7 @@ from flask_cors import CORS
import re
import os
import combineImg
from io import BytesIO
from io import BytesIO, StringIO
import urllib
import msgs
import twExtract as twExtract
@ -15,6 +15,10 @@ import vxlogging as log
from utils import getTweetIdFromUrl, pathregex
from vxApi import getApiResponse
from urllib.parse import urlparse
from PyRTF.Elements import Document
from PyRTF.document.section import Section
from PyRTF.document.paragraph import Paragraph
from utils import BytesIOWrapper
app = Flask(__name__)
CORS(app)
user_agent=""
@ -108,11 +112,11 @@ def oembedend():
provName = request.args.get("provider",None)
return oEmbedGen(desc, user, link, ttype,providerName=provName)
def getTweetData(twitter_url):
def getTweetData(twitter_url,include_txt="false",include_rtf="false"):
cachedVNF = getVnfFromLinkCache(twitter_url)
if cachedVNF is not None:
return cachedVNF
try:
rawTweetData = twExtract.extractStatusV2Anon(twitter_url)
except:
@ -127,7 +131,7 @@ def getTweetData(twitter_url):
if rawTweetData is None:
return None
tweetData = getApiResponse(rawTweetData)
tweetData = getApiResponse(rawTweetData,include_txt,include_rtf)
if tweetData is None:
return None
addVnfToLinkCache(twitter_url,tweetData)
@ -154,8 +158,18 @@ def twitfix(sub_path):
if match is None:
abort(404)
twitter_url = f'https://twitter.com/i/status/{getTweetIdFromUrl(sub_path)}'
isApiRequest=request.url.startswith("https://api.vx") or request.url.startswith("http://api.vx")
tweetData = getTweetData(twitter_url)
include_txt="false"
include_rtf="false"
if isApiRequest:
if "include_txt" in request.args:
include_txt = request.args.get("include_txt")
if "include_rtf" in request.args:
include_rtf = request.args.get("include_rtf")
tweetData = getTweetData(twitter_url,include_txt,include_rtf)
if tweetData is None:
log.error("Tweet Data Get failed for "+twitter_url)
return message(msgs.failedToScan)
@ -175,6 +189,19 @@ def twitfix(sub_path):
# remove the .mp4 from the end of the URL
if requestUrlWithoutQuery.endswith(".mp4") or requestUrlWithoutQuery.endswith(".png"):
sub_path = sub_path[:-4]
elif requestUrlWithoutQuery.endswith(".txt"):
return Response(tweetData['text'], mimetype='text/plain')
elif requestUrlWithoutQuery.endswith(".rtf"):
doc = Document()
section = Section()
doc.Sections.append(section)
p = Paragraph()
p.append(tweetData['text'])
section.append(p)
rtf = StringIO()
doc.write(rtf)
rtf.seek(0)
return send_file(BytesIOWrapper(rtf), mimetype='application/rtf', as_attachment=True, download_name=f'{tweetData["user_screen_name"]}_{tweetData["tweetID"]}.rtf')
embedIndex = -1
# if url ends with /1, /2, /3, or /4, we'll use that as the index
@ -182,7 +209,7 @@ def twitfix(sub_path):
embedIndex = int(sub_path[-1])-1
sub_path = sub_path[:-2]
if request.url.startswith("https://api.vx") or request.url.startswith("http://api.vx"): # Directly return the API response if the request is from the API
if isApiRequest: # Directly return the API response if the request is from the API
return tweetData
elif directEmbed: # direct embed
# direct embeds should always prioritize the main tweet, so don't check for qrt

View File

@ -1,4 +1,5 @@
import re
import io
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})")
endTCOregex = re.compile("(^.*?) +https:\/\/t.co\/.*?$")
@ -16,4 +17,27 @@ def stripEndTCO(text):
if match is not None:
return match.group(1)
else:
return text
return text
# https://stackoverflow.com/a/55977438
class BytesIOWrapper(io.BufferedReader):
"""Wrap a buffered bytes stream over TextIOBase string stream."""
def __init__(self, text_io_buffer, encoding=None, errors=None, **kwargs):
super(BytesIOWrapper, self).__init__(text_io_buffer, **kwargs)
self.encoding = encoding or text_io_buffer.encoding or 'utf-8'
self.errors = errors or text_io_buffer.errors or 'strict'
def _encoding_call(self, method_name, *args, **kwargs):
raw_method = getattr(self.raw, method_name)
val = raw_method(*args, **kwargs)
return val.encode(self.encoding, errors=self.errors)
def read(self, size=-1):
return self._encoding_call('read', size)
def read1(self, size=-1):
return self._encoding_call('read1', size)
def peek(self, size=-1):
return self._encoding_call('peek', size)

View File

@ -3,7 +3,7 @@ from datetime import datetime
from configHandler import config
from utils import stripEndTCO
def getApiResponse(tweet,include_txt=False,include_zip=False):
def getApiResponse(tweet,include_txt=False,include_rtf=False):
tweetL = tweet["legacy"]
if "user_result" in tweet["core"]:
userL = tweet["core"]["user_result"]["result"]["legacy"]
@ -66,16 +66,16 @@ def getApiResponse(tweet,include_txt=False,include_zip=False):
hashtags.append(i["text"])
#include_txt = request.args.get("include_txt", "false")
#include_zip = request.args.get("include_zip", "false") # for certain types of archival software (i.e Hydrus)
#include_rtf = request.args.get("include_rtf", "false") # for certain types of archival software (i.e Hydrus)
if include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0):
if include_txt == True or include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0):
txturl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".txt"
media.append(txturl)
media_extended.append({"url":txturl,"type":"txt"})
if include_zip == "true" or (include_zip == "ifnomedia" and len(media)==0):
zipurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".zip"
media.append(zipurl)
media_extended.append({"url":zipurl,"type":"zip"})
if include_rtf == True or include_rtf == "true" or (include_rtf == "ifnomedia" and len(media)==0):
rtfurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".rtf"
media.append(rtfurl)
media_extended.append({"url":rtfurl,"type":"rtf"})
qrtURL = None
if 'quoted_status_id_str' in tweetL: