diff --git a/api.md b/api.md index a2300c1..134b36c 100644 --- a/api.md +++ b/api.md @@ -46,5 +46,6 @@ The following fields are returned: } ``` -Additionally, you can add the `include_txt` parameter. Setting it to 'true' will include an additional media URL, a link to a .txt containing basic tweet information. -`include_txt` can also be set to `ifnomedia`, which will only add a link to the .txt if there is no media URLs in the tweet. \ No newline at end of file +Additionally, you can add the `include_rtf` or `include_txt` parameters. Setting any to 'true' will include an additional media URL, a link to a .txt or .rtf containing basic tweet information. +Instead of "true", they can also be set to `ifnomedia`, which will only add a link if there is no media URLs in the tweet. +These features are added for archival of text tweets in certain software. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 13d6753..453e1ce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ Flask-Cors==4.0.0 yt-dlp==2022.7.18 Werkzeug==2.3.7 numerize==0.12 -oauthlib==3.2.2 \ No newline at end of file +oauthlib==3.2.2 +PyRTF3==0.47.5 \ No newline at end of file diff --git a/test_api.py b/test_api.py new file mode 100644 index 0000000..c0f24cf --- /dev/null +++ b/test_api.py @@ -0,0 +1,38 @@ +from vx_testdata import * +import twitfix,twExtract +from flask.testing import FlaskClient +client = FlaskClient(twitfix.app) + +def test_api_include_txt(): + resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=true",headers={"User-Agent":"test"}) + jData = resp.get_json() + assert resp.status_code==200 + assert any(".txt" in i for i in jData["mediaURLs"]) + +def test_api_include_rtf(): + resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=true",headers={"User-Agent":"test"}) + jData = resp.get_json() + assert resp.status_code==200 + assert any(".rtf" in i for i in jData["mediaURLs"]) + +def test_api_include_txt_nomedia(): + resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=ifnomedia",headers={"User-Agent":"test"}) + jData = resp.get_json() + assert resp.status_code==200 + assert any(".txt" in i for i in jData["mediaURLs"]) + + resp = client.get(testMediaTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=ifnomedia",headers={"User-Agent":"test"}) + jData = resp.get_json() + assert resp.status_code==200 + assert not any(".txt" in i for i in jData["mediaURLs"]) + +def test_api_include_rtf_nomedia(): + resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=ifnomedia",headers={"User-Agent":"test"}) + jData = resp.get_json() + assert resp.status_code==200 + assert any(".rtf" in i for i in jData["mediaURLs"]) + + resp = client.get(testMediaTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=ifnomedia",headers={"User-Agent":"test"}) + jData = resp.get_json() + assert resp.status_code==200 + assert not any(".rtf" in i for i in jData["mediaURLs"]) \ No newline at end of file diff --git a/test_vx_embeds.py b/test_vx_embeds.py index c0d6596..919b85c 100644 --- a/test_vx_embeds.py +++ b/test_vx_embeds.py @@ -173,4 +173,14 @@ def test_embed_stripLastUrl(): def test_embed_no_username(): resp = client.get(testMediaTweet.replace("/pdxdylan",""),headers={"User-Agent":"test"}) - assert resp.status_code==200 \ No newline at end of file + assert resp.status_code==200 + +def test_embed_txt(): + resp = client.get(testTextTweet.replace("https://twitter.com","")+".txt",headers={"User-Agent":"test"}) + assert resp.status_code==200 + assert testTextTweet_compare["text"] in str(resp.data) + +def test_embed_rtf(): + resp = client.get(testTextTweet.replace("https://twitter.com","")+".rtf",headers={"User-Agent":"test"}) + assert resp.status_code==200 + assert testTextTweet_compare["text"] in str(resp.data) \ No newline at end of file diff --git a/tokenTester.py b/tokenTester.py index 19fc1fa..42542a9 100644 --- a/tokenTester.py +++ b/tokenTester.py @@ -3,10 +3,13 @@ import vx_testdata tokens = "" tokensList = tokens.split(",") - +errorTokens = [] for token in tokensList: try: twExtract.extractStatusV2(vx_testdata.testNSFWTweet,workaroundTokens=[token]) except Exception as e: print(str(e)+" "+token) - pass \ No newline at end of file + errorTokens.append(token) + pass + +print("Error tokens: "+str(errorTokens)) \ No newline at end of file diff --git a/twitfix.py b/twitfix.py index 4fca90e..f97370f 100644 --- a/twitfix.py +++ b/twitfix.py @@ -4,7 +4,7 @@ from flask_cors import CORS import re import os import combineImg -from io import BytesIO +from io import BytesIO, StringIO import urllib import msgs import twExtract as twExtract @@ -15,6 +15,10 @@ import vxlogging as log from utils import getTweetIdFromUrl, pathregex from vxApi import getApiResponse from urllib.parse import urlparse +from PyRTF.Elements import Document +from PyRTF.document.section import Section +from PyRTF.document.paragraph import Paragraph +from utils import BytesIOWrapper app = Flask(__name__) CORS(app) user_agent="" @@ -108,11 +112,11 @@ def oembedend(): provName = request.args.get("provider",None) return oEmbedGen(desc, user, link, ttype,providerName=provName) -def getTweetData(twitter_url): +def getTweetData(twitter_url,include_txt="false",include_rtf="false"): cachedVNF = getVnfFromLinkCache(twitter_url) if cachedVNF is not None: return cachedVNF - + try: rawTweetData = twExtract.extractStatusV2Anon(twitter_url) except: @@ -127,7 +131,7 @@ def getTweetData(twitter_url): if rawTweetData is None: return None - tweetData = getApiResponse(rawTweetData) + tweetData = getApiResponse(rawTweetData,include_txt,include_rtf) if tweetData is None: return None addVnfToLinkCache(twitter_url,tweetData) @@ -154,8 +158,18 @@ def twitfix(sub_path): if match is None: abort(404) twitter_url = f'https://twitter.com/i/status/{getTweetIdFromUrl(sub_path)}' + isApiRequest=request.url.startswith("https://api.vx") or request.url.startswith("http://api.vx") - tweetData = getTweetData(twitter_url) + include_txt="false" + include_rtf="false" + + if isApiRequest: + if "include_txt" in request.args: + include_txt = request.args.get("include_txt") + if "include_rtf" in request.args: + include_rtf = request.args.get("include_rtf") + + tweetData = getTweetData(twitter_url,include_txt,include_rtf) if tweetData is None: log.error("Tweet Data Get failed for "+twitter_url) return message(msgs.failedToScan) @@ -175,6 +189,19 @@ def twitfix(sub_path): # remove the .mp4 from the end of the URL if requestUrlWithoutQuery.endswith(".mp4") or requestUrlWithoutQuery.endswith(".png"): sub_path = sub_path[:-4] + elif requestUrlWithoutQuery.endswith(".txt"): + return Response(tweetData['text'], mimetype='text/plain') + elif requestUrlWithoutQuery.endswith(".rtf"): + doc = Document() + section = Section() + doc.Sections.append(section) + p = Paragraph() + p.append(tweetData['text']) + section.append(p) + rtf = StringIO() + doc.write(rtf) + rtf.seek(0) + return send_file(BytesIOWrapper(rtf), mimetype='application/rtf', as_attachment=True, download_name=f'{tweetData["user_screen_name"]}_{tweetData["tweetID"]}.rtf') embedIndex = -1 # if url ends with /1, /2, /3, or /4, we'll use that as the index @@ -182,7 +209,7 @@ def twitfix(sub_path): embedIndex = int(sub_path[-1])-1 sub_path = sub_path[:-2] - if request.url.startswith("https://api.vx") or request.url.startswith("http://api.vx"): # Directly return the API response if the request is from the API + if isApiRequest: # Directly return the API response if the request is from the API return tweetData elif directEmbed: # direct embed # direct embeds should always prioritize the main tweet, so don't check for qrt diff --git a/utils.py b/utils.py index d0b89d1..95e582a 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,5 @@ import re +import io pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})") endTCOregex = re.compile("(^.*?) +https:\/\/t.co\/.*?$") @@ -16,4 +17,27 @@ def stripEndTCO(text): if match is not None: return match.group(1) else: - return text \ No newline at end of file + return text + +# https://stackoverflow.com/a/55977438 +class BytesIOWrapper(io.BufferedReader): + """Wrap a buffered bytes stream over TextIOBase string stream.""" + + def __init__(self, text_io_buffer, encoding=None, errors=None, **kwargs): + super(BytesIOWrapper, self).__init__(text_io_buffer, **kwargs) + self.encoding = encoding or text_io_buffer.encoding or 'utf-8' + self.errors = errors or text_io_buffer.errors or 'strict' + + def _encoding_call(self, method_name, *args, **kwargs): + raw_method = getattr(self.raw, method_name) + val = raw_method(*args, **kwargs) + return val.encode(self.encoding, errors=self.errors) + + def read(self, size=-1): + return self._encoding_call('read', size) + + def read1(self, size=-1): + return self._encoding_call('read1', size) + + def peek(self, size=-1): + return self._encoding_call('peek', size) \ No newline at end of file diff --git a/vxApi.py b/vxApi.py index 97984bc..b26f055 100644 --- a/vxApi.py +++ b/vxApi.py @@ -3,7 +3,7 @@ from datetime import datetime from configHandler import config from utils import stripEndTCO -def getApiResponse(tweet,include_txt=False,include_zip=False): +def getApiResponse(tweet,include_txt=False,include_rtf=False): tweetL = tweet["legacy"] if "user_result" in tweet["core"]: userL = tweet["core"]["user_result"]["result"]["legacy"] @@ -66,16 +66,16 @@ def getApiResponse(tweet,include_txt=False,include_zip=False): hashtags.append(i["text"]) #include_txt = request.args.get("include_txt", "false") - #include_zip = request.args.get("include_zip", "false") # for certain types of archival software (i.e Hydrus) + #include_rtf = request.args.get("include_rtf", "false") # for certain types of archival software (i.e Hydrus) - if include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0): + if include_txt == True or include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0): txturl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".txt" media.append(txturl) media_extended.append({"url":txturl,"type":"txt"}) - if include_zip == "true" or (include_zip == "ifnomedia" and len(media)==0): - zipurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".zip" - media.append(zipurl) - media_extended.append({"url":zipurl,"type":"zip"}) + if include_rtf == True or include_rtf == "true" or (include_rtf == "ifnomedia" and len(media)==0): + rtfurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".rtf" + media.append(rtfurl) + media_extended.append({"url":rtfurl,"type":"rtf"}) qrtURL = None if 'quoted_status_id_str' in tweetL: