From ffcc96c13e72b692205a6fefcafd681412973724 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 17 Apr 2024 21:40:26 +0100 Subject: [PATCH] Begin work on code refactor. Issues this will most likely solve when finished: #201, #195, #115, #20, --- msgs.py | 4 +- templates/image.html | 25 +- templates/rawvideo.html | 18 +- templates/redirect_script.html | 28 -- templates/text.html | 23 +- templates/tweetCommon.html | 5 + templates/video.html | 7 +- test_vx.py | 26 +- twitfix.py | 676 ++++----------------------------- vxApi.py | 23 +- 10 files changed, 143 insertions(+), 692 deletions(-) delete mode 100644 templates/redirect_script.html create mode 100644 templates/tweetCommon.html diff --git a/msgs.py b/msgs.py index c22d718..5cec6ff 100644 --- a/msgs.py +++ b/msgs.py @@ -10,8 +10,8 @@ videoDescLimit=220 tweetDescLimit=340 def genLikesDisplay(vnf): - if vnf['rts'] > 0: - return ("\n\nšŸ’– " + numerize.numerize(vnf['likes']) + " šŸ” " + numerize.numerize(vnf['rts'])) + if vnf['retweets'] > 0: + return ("\n\nšŸ’– " + numerize.numerize(vnf['likes']) + " šŸ” " + numerize.numerize(vnf['retweets'])) else: return ("\n\nšŸ’– " + numerize.numerize(vnf['likes'])) diff --git a/templates/image.html b/templates/image.html index 09faff7..86b0fea 100644 --- a/templates/image.html +++ b/templates/image.html @@ -1,25 +1,10 @@ {% extends 'base.html' %} - {% block head %} - - - - + +{% include 'tweetCommon.html' %} + {% if pic[1] %} @@ -34,9 +19,9 @@ {% endif %} - + - + {% endblock %} {% block body %} Redirecting you to the tweet in a moment. Or click here. {% endblock %} \ No newline at end of file diff --git a/templates/rawvideo.html b/templates/rawvideo.html index f50cbc3..907b39b 100644 --- a/templates/rawvideo.html +++ b/templates/rawvideo.html @@ -1,16 +1,16 @@ {% extends 'base.html' %} {% block head %} - + - - - - - + + + + + - - + + - {% endblock %} {% block body %} Redirecting you to the video in a moment. Or click here. {% endblock %} \ No newline at end of file + {% endblock %} {% block body %} Redirecting you to the video in a moment. Or click here. {% endblock %} \ No newline at end of file diff --git a/templates/redirect_script.html b/templates/redirect_script.html deleted file mode 100644 index e136a64..0000000 --- a/templates/redirect_script.html +++ /dev/null @@ -1,28 +0,0 @@ - \ No newline at end of file diff --git a/templates/text.html b/templates/text.html index c2e49d7..9edf2fb 100644 --- a/templates/text.html +++ b/templates/text.html @@ -1,30 +1,9 @@ {% extends 'base.html' %} - {% block head %} - - - - +{% include 'tweetCommon.html' %} diff --git a/templates/tweetCommon.html b/templates/tweetCommon.html new file mode 100644 index 0000000..c928956 --- /dev/null +++ b/templates/tweetCommon.html @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/templates/video.html b/templates/video.html index 2fdb4c5..cb63be4 100644 --- a/templates/video.html +++ b/templates/video.html @@ -1,10 +1,8 @@ {% extends 'base.html' %} {% block head %} - - - + - +{% include 'tweetCommon.html' %} @@ -17,7 +15,6 @@ - diff --git a/test_vx.py b/test_vx.py index 4ab95e6..81dc1cd 100644 --- a/test_vx.py +++ b/test_vx.py @@ -48,11 +48,11 @@ def compareDict(original,compare): compareDict(original[key],compare[key]) ## Specific API tests ## -def test_syndicationAPI(): +def test_twextract_syndicationAPI(): tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens) assert tweet["full_text"]==testMediaTweet_compare['description'] -def test_extractStatusV2Anon(): +def test_twextract_extractStatusV2Anon(): tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None) assert tweet["full_text"]==testTextTweet_compare['description'] tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None) @@ -63,40 +63,40 @@ def test_extractStatusV2Anon(): assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94] -def test_v2API(): +def test_twextract_v2API(): tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens) assert tweet["full_text"]==testMediaTweet_compare['description'] ## Tweet retrieve tests ## -def test_textTweetExtract(): +def test_twextract_textTweetExtract(): tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens) assert tweet["full_text"]==testTextTweet_compare['description'] assert tweet["user"]["screen_name"]=="jack" assert 'extended_entities' not in tweet -def test_extractV2(): # remove this when v2 is default +def test_twextract_extractV2(): # remove this when v2 is default tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens) -def test_UserExtract(): +def test_twextract_UserExtract(): user = twExtract.extractUser(testUser,workaroundTokens=tokens) assert user["screen_name"]=="jack" assert user["id"]==12 assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" -def test_UserExtractID(): +def test_twextract_UserExtractID(): user = twExtract.extractUser(testUserID,workaroundTokens=tokens) assert user["screen_name"]=="jack" assert user["id"]==12 assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" -def test_UserExtractWeirdURLs(): +def test_twextract_UserExtractWeirdURLs(): for url in testUserWeirdURLs: user = twExtract.extractUser(url,workaroundTokens=tokens) assert user["screen_name"]=="jack" assert user["id"]==12 assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" -def test_videoTweetExtract(): +def test_twextract_videoTweetExtract(): tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens) assert tweet["full_text"]==testVideoTweet_compare['description'] assert 'extended_entities' in tweet @@ -106,7 +106,7 @@ def test_videoTweetExtract(): assert video["type"]=="video" -def test_mediaTweetExtract(): +def test_twextract_mediaTweetExtract(): tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens) assert tweet["full_text"]==testMediaTweet_compare['description'] assert 'extended_entities' in tweet @@ -116,7 +116,7 @@ def test_mediaTweetExtract(): assert video["type"]=="photo" -def test_multimediaTweetExtract(): +def test_twextract_multimediaTweetExtract(): tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens) assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94] assert 'extended_entities' in tweet @@ -128,12 +128,12 @@ def test_multimediaTweetExtract(): assert video["media_url_https"]==testMultiMediaTweet_compare["images"][1] assert video["type"]=="photo" -def test_pollTweetExtract(): +def test_twextract_pollTweetExtract(): tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens) assert 'card' in tweet compareDict(testPoll_comparePoll,tweet['card']) -def test_NSFW_TweetExtract(): +def test_twextract_NSFW_TweetExtract(): tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error ## VNF conversion test ## diff --git a/twitfix.py b/twitfix.py index 94e3785..ad9d7c1 100644 --- a/twitfix.py +++ b/twitfix.py @@ -3,10 +3,7 @@ from flask import Flask, render_template, request, redirect, abort, Response, se from flask_cors import CORS import re import os -import urllib.parse -import urllib.request import combineImg -from datetime import date,datetime, timedelta from io import BytesIO, StringIO import msgs import twExtract as twExtract @@ -14,7 +11,6 @@ from configHandler import config from cache import addVnfToLinkCache,getVnfFromLinkCache from yt_dlp.utils import ExtractorError import vxlogging as log -import zipfile from vxApi import getApiResponse from urllib.parse import urlparse @@ -54,18 +50,34 @@ def getTweetIdFromUrl(url): else: return None +def renderImageTweetEmbed(tweetData,image,appnameSuffix=""): + qrt = None + pollData = None + embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData)) + return render_template("image.html", + tweet=tweetData, + pic=[image], + host=config['config']['url'], + desc=embedDesc, + tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}', + appname=config['config']['appname']+appnameSuffix, + ) + +def renderVideoTweetEmbed(tweetData,video,appnameSuffix=""): + # TODO: render video tweet embed template + return "Video tweet embed" + +def renderTextTweetEmbed(tweetData,appnameSuffix=""): + # TODO: render text tweet embed template + return "Text tweet embed" + @app.route('/robots.txt') def robots(): return "User-agent: *\nDisallow: /" @app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly def default(): - global user_agent - user_agent = request.headers.get('user-agent') - if isValidUserAgent(user_agent): - return message("TwitFix is an attempt to fix twitter video embeds in discord! created by Robin Universe :)\n\nšŸ’–\n\nClick me to be redirected to the repo!") - else: - return redirect(config['config']['repo'], 301) + return redirect(config['config']['repo'], 301) @app.route('/oembed.json') #oEmbed endpoint def oembedend(): @@ -76,157 +88,75 @@ def oembedend(): provName = request.args.get("provider",None) return oEmbedGen(desc, user, link, ttype,providerName=provName) +def getTweetData(twitter_url): + try: + rawTweetData = twExtract.extractStatusV2Anon(twitter_url) + except: + rawTweetData = None + if rawTweetData is None: + rawTweetData = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(',')) + if 'error' in rawTweetData: + return None + + if rawTweetData is None: + return None + tweetData = getApiResponse(rawTweetData) + return tweetData + @app.route('/') # Default endpoint used by everything def twitfix(sub_path): - global user_agent - user_agent = request.headers.get('user-agent') match = pathregex.search(sub_path) + if match is None: + abort(404) + twitter_url = f'https://twitter.com/i/status/{getTweetIdFromUrl(sub_path)}' - if request.url.endswith(".mp4") or request.url.endswith("%2Emp4"): - twitter_url = "https://twitter.com/" + sub_path - - if "?" not in request.url: - clean = twitter_url[:-4] + tweetData = getTweetData(twitter_url) + if tweetData is None: + abort(404) + qrt = None + if 'qrtURL' in tweetData: + qrt = getTweetData(tweetData['qrtURL']) + + ###return tweetData + + embedIndex = -1 + # if url ends with /1, /2, /3, or /4, we'll use that as the index + if sub_path[-2:] in ["/1","/2","/3","/4"]: + embedIndex = int(sub_path[-1])-1 + sub_path = sub_path[:-2] + if request.url.startswith("https://api.vx"): # Directly return the API response if the request is from the API + return tweetData + elif request.url.startswith("https://d.vx"): # direct embed + # determine what type of media we're dealing with + if not tweetData['hasMedia'] and qrt is None: + return renderTextTweetEmbed(tweetData) + elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None: + return redirect(tweetData['combinedMediaUrl'], 302) else: - clean = twitter_url - - vnf,e = vnfFromCacheOrDL(clean) - if vnf is None: - if e is not None: - return message(msgs.failedToScan+msgs.failedToScanExtra+e) - return message(msgs.failedToScan) - return make_cached_vnf_response(vnf,getTemplate("rawvideo.html",vnf,"",[],clean,"","","","")) - elif request.url.endswith(".txt") or request.url.endswith("%2Etxt"): - twitter_url = "https://twitter.com/" + sub_path - - if "?" not in request.url: - clean = twitter_url[:-4] + # this means we have mixed media or video, and we're only going to embed one + if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one + embedIndex = 0 + media = tweetData['media_extended'][embedIndex] + if media['type'] == "image": + return redirect(media['url'], 302) + elif media['type'] == "video" or media['type'] == "animated_gif": + return redirect(media['url'], 302) # TODO: might not work + else: # full embed + if not tweetData['hasMedia']: + return renderTextTweetEmbed(tweetData) + elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None: + return renderImageTweetEmbed(tweetData,tweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality") else: - clean = twitter_url - - vnf,e = vnfFromCacheOrDL(clean) - if vnf is None: - if e is not None: - return abort(500,"Failed to scan tweet: "+e) - return abort(500,"Failed to scan tweet") - return make_content_type_response(getTemplate("txt.html",vnf,vnf["description"],[],clean,"","","",""),"text/plain") - elif request.url.endswith(".zip") or request.url.endswith("%2Ezip"): # for certain types of archival software (i.e Hydrus) - twitter_url = "https://twitter.com/" + sub_path - - if "?" not in request.url: - clean = twitter_url[:-4] - else: - clean = twitter_url - - vnf,e = vnfFromCacheOrDL(clean) - if vnf is None: - if e is not None: - return abort(500,"Failed to scan tweet: "+e) - return abort(500,"Failed to scan tweet") - with app.app_context(): - txtData = getTemplate("txt.html",vnf,vnf["description"],"",clean,"","","","") - txtIo = BytesIO() - txtIo.write(txtData.encode("utf-8")) - txtIo.seek(0) - zipIo = BytesIO() - with zipfile.ZipFile(zipIo, 'w', zipfile.ZIP_DEFLATED) as zipf: - zipf.writestr("tweetInfo.txt", txtIo.read()) - # todo: add images to zip - zipIo.seek(0) - return make_content_type_response(zipIo,"application/zip") - elif request.url.startswith("https://d.vx"): # Matches d.fx? Try to give the user a direct link - if isValidUserAgent(user_agent): - twitter_url = config['config']['url'] + "/"+sub_path - log.debug( "d.vx link shown to discord user-agent!") - if request.url.endswith(".mp4") and "?" not in request.url: - if "?" not in request.url: - clean = twitter_url[:-4] - else: - clean = twitter_url - else: - clean = twitter_url - return redirect(clean+".mp4", 301) - else: - log.debug("Redirect to MP4 using d.fxtwitter.com") - return dir(sub_path) - elif request.url.endswith("/1") or request.url.endswith("/2") or request.url.endswith("/3") or request.url.endswith("/4") or request.url.endswith("%2F1") or request.url.endswith("%2F2") or request.url.endswith("%2F3") or request.url.endswith("%2F4"): - twitter_url = "https://twitter.com/" + sub_path - - if "?" not in request.url: - clean = twitter_url[:-2] - else: - clean = twitter_url + # this means we have mixed media or video, and we're only going to embed one + if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one + embedIndex = 0 + media = tweetData['media_extended'][embedIndex] + if media['type'] == "image": + return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=f' - Media {embedIndex+1}/{len(tweetData["media_extended"])}') + elif media['type'] == "video" or media['type'] == "animated_gif": + return renderVideoTweetEmbed(tweetData,media['url']) - image = ( int(request.url[-1]) - 1 ) - return embed_video(clean, image) - elif request.url.startswith("https://api.vx"): - twitter_url = "https://twitter.com/" + sub_path - try: - try: - tweet = twExtract.extractStatusV2Anon(twitter_url) - except: - tweet = None - if tweet is None: - tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(',')) - if tweet is None: - log.error("API Get failed: " + twitter_url + " (Tweet null)") - abort(500, '{"message": "Failed to extract tweet (Twitter API error)"}') - if 'error' in tweet: - response = make_response(jsonify(tweet), 500) - response.headers['Content-Type'] = 'application/json' - response.cache_control.max_age = 3600 - response.cache_control.public = True - return response - log.success("API Get success") - return getApiResponse(tweet) - except Exception as e: - log.error("API Get failed: " + twitter_url + " " + log.get_exception_traceback_str(e)) - abort(500, '{"message": "Failed to extract tweet (Processing error)"}') - - if match is not None: - twitter_url = sub_path - - if match.start() == 0: - twitter_url = "https://twitter.com/" + sub_path - else: - # URL normalization messes up the URL, so we have to fix it - if sub_path.startswith("https:/") and not sub_path.startswith("https://"): - twitter_url = sub_path.replace("https:/", "https://", 1) - elif sub_path.startswith("http:/") and not sub_path.startswith("http://"): - twitter_url = sub_path.replace("http:/", "http://", 1) - - if isValidUserAgent(user_agent): - res = embedCombined(twitter_url) - return res - - else: - log.debug("Redirect to " + twitter_url) - return redirect(twitter_url, 301) - else: - return message("This doesn't appear to be a twitter URL") - - -@app.route('/dir/') # Try to return a direct link to the MP4 on twitters servers -def dir(sub_path): - global user_agent - user_agent = request.headers.get('user-agent') - url = sub_path - match = pathregex.search(url) - if match is not None: - twitter_url = url - - if match.start() == 0: - twitter_url = "https://twitter.com/" + url - - if isValidUserAgent(user_agent): - res = embed_video(twitter_url) - return res - - else: - log.debug("Redirect to direct MP4 URL") - return direct_video(twitter_url) - else: - return redirect(url, 301) + return tweetData @app.route('/favicon.ico') def favicon(): # pragma: no cover @@ -261,444 +191,6 @@ def rendercombined(): imgIo.seek(0) return send_file(imgIo, mimetype='image/jpeg',max_age=86400) -def upgradeVNF(vnf): - # Makes sure any VNF object passed through this has proper fields if they're added in later versions - if 'verified' not in vnf: - vnf['verified']=False - if 'size' not in vnf: - if vnf['type'] == 'Video': - vnf['size']={'width':720,'height':480} - else: - vnf['size']={} - if 'qrtURL' not in vnf: - if vnf['qrt'] == {}: - vnf['qrtURL'] = None - else: # - vnf['qrtURL'] = f"https://twitter.com/{vnf['qrt']['screen_name']}/status/{vnf['qrt']['id']}" - if 'isGif' not in vnf: - vnf['isGif'] = False - return vnf - -def getDefaultTTL(): # TTL for deleting items from the database - return datetime.today().replace(microsecond=0) + timedelta(days=1) - -def secondsUntilTTL(ttl): - untilTTL = ttl - datetime.today().replace(microsecond=0) - return untilTTL.total_seconds() - -def make_content_type_response(response, content_type): - resp = make_response(response) - resp.headers['Content-Type'] = content_type - return resp - -def make_cached_vnf_response(vnf,response): - return response - try: - if 'ttl' not in vnf or vnf['ttl'] == None or secondsUntilTTL(vnf['ttl']) <= 0: - return response - resp = make_response(response) - resp.cache_control.max_age = secondsUntilTTL(vnf['ttl']) - resp.cache_control.public = True - return resp - except Exception as e: - log.error("Error making cached response: " + str(e)) - return response - -def vnfFromCacheOrDL(video_link): - cached_vnf = getVnfFromLinkCache(video_link) - if cached_vnf == None: - try: - vnf = link_to_vnf(video_link) - addVnfToLinkCache(video_link, vnf) - log.success("VNF Get success") - return vnf,None - except (ExtractorError, twExtract.TwExtractError) as exErr: - if 'HTTP Error 404' in exErr.msg or 'No status found with that ID' in exErr.msg: - exErr.msg=msgs.tweetNotFound - elif 'suspended' in exErr.msg: - exErr.msg=msgs.tweetSuspended - else: - exErr.msg=msgs.unknownError - - log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(exErr)) - return None,exErr.msg - except Exception as e: - log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(e)) - return None,None - else: - return upgradeVNF(cached_vnf),None - -def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link - vnf,e = vnfFromCacheOrDL(video_link) - if vnf != None: - return redirect(vnf['url'], 301) - else: - if e is not None: - return message(msgs.failedToScan+msgs.failedToScanExtra+e) - return message(msgs.failedToScan) - -def direct_video_link(video_link): # Just get a redirect to a MP4 link from any tweet link - vnf,e = vnfFromCacheOrDL(video_link) - if vnf != None: - return vnf['url'] - else: - if e is not None: - return message(msgs.failedToScan+msgs.failedToScanExtra+e) - return message(msgs.failedToScan) - -def embed_video(video_link, image=0): # Return Embed from any tweet link - vnf,e = vnfFromCacheOrDL(video_link) - - if vnf != None: - return embed(video_link, vnf, image) - else: - if e is not None: - return message(msgs.failedToScan+msgs.failedToScanExtra+e) - return message(msgs.failedToScan) - -def tweetInfo(url, tweet="", desc="", thumb="", uploader="", screen_name="", pfp="", tweetType="", images="", hits=0, likes=0, rts=0, time="", qrtURL="", nsfw=False,ttl=None,verified=False,size={},poll=None,isGif=False): # Return a dict of video info with default values - if (ttl==None): - ttl = getDefaultTTL() - vnf = { - "tweet" : tweet, - "url" : url, - "description" : desc, - "thumbnail" : thumb, - "uploader" : uploader, - "screen_name" : screen_name, - "pfp" : pfp, - "type" : tweetType, - "images" : images, - "hits" : hits, - "likes" : likes, - "rts" : rts, - "time" : time, - "qrtURL" : qrtURL, - "nsfw" : nsfw, - "ttl" : ttl, - "verified" : verified, - "size" : size, - "poll" : poll, - "isGif" : isGif, - "tweetId" : int(getTweetIdFromUrl(tweet)) - } - if (poll is None): - del vnf['poll'] - return vnf - -def link_to_vnf_from_tweet_data(tweet,video_link): - imgs = ["","","","", ""] - log.debug("Tweet Type: " + tweetType(tweet)) - isGif=False - # Check to see if tweet has a video, if not, make the url passed to the VNF the first t.co link in the tweet - if tweetType(tweet) == "Video": - media=tweet['extended_entities']['media'][0] - if media['video_info']['variants']: - best_bitrate = -1 - thumb = media['media_url'] - if 'original_info' in media: - size=media["original_info"] - elif 'sizes' in media and ('large' in media["sizes"] or 'medium' in media["sizes"] or 'small' in media["sizes"] or 'thumb' in media["sizes"]): - possibleSizes=['large','medium','small','thumb'] - for p in possibleSizes: - if p in media["sizes"]: - size={'width':media["sizes"][p]['w'],'height':media["sizes"][p]['h']} - break - else: - size={'width':720,'height':480} - for video in media['video_info']['variants']: - if video['content_type'] == "video/mp4" and '/hevc/' not in video["url"] and video['bitrate'] > best_bitrate: - url = video['url'] - best_bitrate = video['bitrate'] - elif tweetType(tweet) == "Text": - url = "" - thumb = "" - size = {} - else: - imgs = ["","","","", ""] - i = 0 - for media in tweet['extended_entities']['media']: - imgs[i] = media['media_url_https'] - i = i + 1 - - imgs[4] = str(i) - url = "" - images= imgs - thumb = tweet['extended_entities']['media'][0]['media_url_https'] - size = {} - - if 'extended_entities' in tweet and 'media' in tweet['extended_entities'] and tweet['extended_entities']['media'][0]['type'] == 'animated_gif': - isGif=True - - qrtURL = None - if 'quoted_status_permalink' in tweet: - qrtURL = tweet['quoted_status_permalink']['expanded'] - elif 'quoted_status_id_str' in tweet: - qrtURL = "https://twitter.com/i/status/" + tweet['quoted_status_id_str'] - - text = tweet['full_text'] - - if 'possibly_sensitive' in tweet: - nsfw = tweet['possibly_sensitive'] - else: - nsfw = False - - if 'entities' in tweet and 'urls' in tweet['entities']: - for eurl in tweet['entities']['urls']: - if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"): - text = text.replace(eurl["url"], "") - else: - text = text.replace(eurl["url"],eurl["expanded_url"]) - ttl = None #default - - try: - if 'card' in tweet and tweet['card']['name'].startswith('poll'): - poll=getPollObject(tweet['card']) - if tweet['card']['binding_values']['counts_are_final']['boolean_value'] == False: - ttl = datetime.today().replace(microsecond=0) + timedelta(minutes=1) - else: - poll=None - except: - poll=None - - vnf = tweetInfo( - url, - video_link, - text, thumb, - tweet['user']['name'], - tweet['user']['screen_name'], - tweet['user']['profile_image_url'], - tweetType(tweet), - likes=tweet['favorite_count'], - rts=tweet['retweet_count'], - time=tweet['created_at'], - qrtURL=qrtURL, - images=imgs, - nsfw=nsfw, - verified=tweet['user']['verified'], - size=size, - poll=poll, - ttl=ttl, - isGif=isGif - ) - - return vnf - - -def link_to_vnf_from_unofficial_api(video_link): - tweet=None - log.info("Attempting to download tweet info: "+video_link) - tweet = twExtract.extractStatus(video_link,workaroundTokens=config['config']['workaroundTokens'].split(',')) - log.success("Unofficial API Success") - - if "extended_entities" not in tweet: - # check if any entities with urls ending in /video/XXX or /photo/XXX exist - if "entities" in tweet and "urls" in tweet["entities"]: - for url in tweet["entities"]["urls"]: - if "/video/" in url["expanded_url"] or "/photo/" in url["expanded_url"]: - log.info("Extra tweet info found in entities: "+video_link+" -> "+url["expanded_url"]) - subTweet = twExtract.extractStatus(url["expanded_url"],workaroundTokens=config['config']['workaroundTokens'].split(',')) - if "extended_entities" in subTweet: - tweet["extended_entities"] = subTweet["extended_entities"] - break - - return link_to_vnf_from_tweet_data(tweet,video_link) - -def link_to_vnf(video_link): # Return a VideoInfo object or die trying - return link_to_vnf_from_unofficial_api(video_link) - -def message(text): - return render_template( - 'default.html', - message = text, - color = config['config']['color'], - appname = config['config']['appname'], - repo = config['config']['repo'], - url = config['config']['url'] ) - -def getTemplate(template,vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix="",embedVNF=None): - if (embedVNF is None): - embedVNF = vnf - if ('width' in embedVNF['size'] and 'height' in embedVNF['size']): - embedVNF['size']['width'] = min(embedVNF['size']['width'],2000) - embedVNF['size']['height'] = min(embedVNF['size']['height'],2000) - return render_template( - template, - likes = vnf['likes'], - rts = vnf['rts'], - time = vnf['time'], - screenName = vnf['screen_name'], - vidlink = embedVNF['url'], - userLink = f"https://twitter.com/{vnf['screen_name']}", - pfp = vnf['pfp'], - vidurl = embedVNF['url'], - desc = desc, - pic = images, - user = vnf['uploader'], - video_link = vnf, - color = color, - appname = config['config']['appname'] + appNameSuffix, - repo = config['config']['repo'], - url = config['config']['url'], - urlDesc = urlDesc, - urlUser = urlUser, - urlLink = urlLink, - urlUserLink= urllib.parse.quote(f"https://twitter.com/{vnf['screen_name']}"), - tweetLink = vnf['tweet'], - videoSize = embedVNF['size'] ) - -def embed(video_link, vnf, image): - log.info("Embedding " + vnf['type'] + ": " + video_link) - - desc = re.sub(r' https:\/\/t\.co\/\S+(?=\s|$)', '', vnf['description']) - urlUser = urllib.parse.quote(vnf['uploader']) - urlDesc = urllib.parse.quote(desc) - urlLink = urllib.parse.quote(video_link) - likeDisplay = msgs.genLikesDisplay(vnf) - - if 'poll' in vnf: - pollDisplay= msgs.genPollDisplay(vnf['poll']) - else: - pollDisplay="" - - qrt=None - if vnf['qrtURL'] is not None: - qrt,e=vnfFromCacheOrDL(vnf['qrtURL']) - if qrt is not None: - desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay) - else: - desc=msgs.formatEmbedDesc(vnf['type'],desc,None,pollDisplay,likeDisplay) - embedVNF=None - appNamePost = "" - if vnf['type'] == "Text": # Change the template based on tweet type - template = 'text.html' - if qrt is not None and qrt['type'] != "Text": - embedVNF=qrt - if qrt['type'] == "Image": - if embedVNF['images'][4]!="1": - appNamePost = " - Image " + str(image+1) + " of " + str(vnf['images'][4]) - image = embedVNF['images'][image] - template = 'image.html' - elif qrt['type'] == "Video" or qrt['type'] == "": - urlDesc = urllib.parse.quote(desc) - template = 'video.html' - - if vnf['type'] == "Image": - if vnf['images'][4]!="1": - appNamePost = " - Image " + str(image+1) + "/" + str(vnf['images'][4]) - image = vnf['images'][image] - template = 'image.html' - - if vnf['type'] == "Video": - if vnf['isGif'] == True and config['config']['gifConvertAPI'] != "" and config['config']['gifConvertAPI'] != "none": - vnf['url'] = f"{config['config']['gifConvertAPI']}/convert.mp4?url={vnf['url']}" - appNamePost = " - GIF" - urlDesc = urllib.parse.quote(desc) - template = 'video.html' - - if vnf['type'] == "": - urlDesc = urllib.parse.quote(desc) - template = 'video.html' - - color = "#7FFFD4" # Green - - if vnf['nsfw'] == True: - color = "#800020" # Red - - return make_cached_vnf_response(vnf,getTemplate(template,vnf,desc,[image],video_link,color,urlDesc,urlUser,urlLink,appNamePost,embedVNF)) - - -def embedCombined(video_link): - vnf,e = vnfFromCacheOrDL(video_link) - - if vnf != None: - return make_cached_vnf_response(vnf,embedCombinedVnf(video_link, vnf)) - else: - if e is not None: - return message(msgs.failedToScan+msgs.failedToScanExtra+e) - return message(msgs.failedToScan) - -def embedCombinedVnf(video_link,vnf): - qrt=None - if vnf['qrtURL'] is not None: - qrt,e=vnfFromCacheOrDL(vnf['qrtURL']) - - if vnf['type'] != "Image" and vnf['type'] != "Video" and qrt is not None and qrt['type'] == "Image": - if qrt['images'][4]!="1": - vnf['images'] = qrt['images'] - vnf['type'] = "Image" - - if vnf['type'] != "Image" or vnf['images'][4] == "1": - return embed(video_link, vnf, 0) - desc = re.sub(r' http.*t\.co\S+', '', vnf['description']) - urlUser = urllib.parse.quote(vnf['uploader']) - urlDesc = urllib.parse.quote(desc) - urlLink = urllib.parse.quote(video_link) - likeDisplay = msgs.genLikesDisplay(vnf) - - if 'poll' in vnf: - pollDisplay= msgs.genPollDisplay(vnf['poll']) - else: - pollDisplay="" - - - if qrt is not None: - desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay) - - suffix="" - #if 'Discord' in user_agent: - # images = [] - # for i in range(0,int(vnf['images'][4])): - # images.append(vnf['images'][i]) - #else: - host = config['config']['url'] - image = f"{host}/rendercombined.jpg?imgs=" - for i in range(0,int(vnf['images'][4])): - image = image + vnf['images'][i] + "," - image = image[:-1] # Remove last comma - images=[image] - suffix=" - View original tweet for full quality" - - color = "#7FFFD4" # Green - - if vnf['nsfw'] == True: - color = "#800020" # Red - return make_cached_vnf_response(vnf,getTemplate('image.html',vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix=suffix)) - - -def getPollObject(card): - poll={"total_votes":0,"choices":[]} - choiceCount=0 - if (card["name"]=="poll2choice_text_only"): - choiceCount=2 - elif (card["name"]=="poll3choice_text_only"): - choiceCount=3 - elif (card["name"]=="poll4choice_text_only"): - choiceCount=4 - - for i in range(0,choiceCount): - choice = {"text":card["binding_values"][f"choice{i+1}_label"]["string_value"],"votes":int(card["binding_values"][f"choice{i+1}_count"]["string_value"])} - poll["total_votes"]+=choice["votes"] - poll["choices"].append(choice) - # update each choice with a percentage - for choice in poll["choices"]: - choice["percent"] = round((choice["votes"]/poll["total_votes"])*100,1) - - return poll - - -def tweetType(tweet): # Are we dealing with a Video, Image, or Text tweet? - if 'extended_entities' in tweet: - if 'video_info' in tweet['extended_entities']['media'][0]: - out = "Video" - else: - out = "Image" - else: - out = "Text" - - return out - - def oEmbedGen(description, user, video_link, ttype,providerName=None): if providerName == None: providerName = config['config']['appname'] diff --git a/vxApi.py b/vxApi.py index 9e1bc97..893cbf3 100644 --- a/vxApi.py +++ b/vxApi.py @@ -90,6 +90,24 @@ def getApiResponse(tweet,include_txt=False,include_zip=False): else: twText = twText.replace(eurl["url"],eurl["expanded_url"]) + # check if all extended media are the same type + sameMedia = False + if len(media_extended) > 1: + sameMedia = True + for i in media_extended: + if i["type"] != media_extended[0]["type"]: + sameMedia = False + break + + combinedMediaUrl = None + if sameMedia and media_extended[0]["type"] == "image": + host=config['config']['url'] + combinedMediaUrl = f'{host}/rendercombined.jpg?imgs=' + for i in media: + combinedMediaUrl += i + "," + combinedMediaUrl = combinedMediaUrl[:-1] + + apiObject = { "text": twText, "likes": tweetL["favorite_count"], @@ -107,7 +125,10 @@ def getApiResponse(tweet,include_txt=False,include_zip=False): "possibly_sensitive": tweetL["possibly_sensitive"], "hashtags": hashtags, "qrtURL": qrtURL, - "communityNote": communityNote + "communityNote": communityNote, + "allSameType": sameMedia, + "hasMedia": len(media) > 0, + "combinedMediaUrl": combinedMediaUrl } try: apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp())