diff --git a/msgs.py b/msgs.py
index c22d718..5cec6ff 100644
--- a/msgs.py
+++ b/msgs.py
@@ -10,8 +10,8 @@ videoDescLimit=220
tweetDescLimit=340
def genLikesDisplay(vnf):
- if vnf['rts'] > 0:
- return ("\n\nš " + numerize.numerize(vnf['likes']) + " š " + numerize.numerize(vnf['rts']))
+ if vnf['retweets'] > 0:
+ return ("\n\nš " + numerize.numerize(vnf['likes']) + " š " + numerize.numerize(vnf['retweets']))
else:
return ("\n\nš " + numerize.numerize(vnf['likes']))
diff --git a/templates/image.html b/templates/image.html
index 09faff7..86b0fea 100644
--- a/templates/image.html
+++ b/templates/image.html
@@ -1,25 +1,10 @@
{% extends 'base.html' %}
-
{% block head %}
-
-
-
-
+
+{% include 'tweetCommon.html' %}
+
{% if pic[1] %}
@@ -34,9 +19,9 @@
{% endif %}
-
+
-
+
{% endblock %} {% block body %} Redirecting you to the tweet in a moment. Or click here. {% endblock %}
\ No newline at end of file
diff --git a/templates/rawvideo.html b/templates/rawvideo.html
index f50cbc3..907b39b 100644
--- a/templates/rawvideo.html
+++ b/templates/rawvideo.html
@@ -1,16 +1,16 @@
{% extends 'base.html' %} {% block head %}
-
+
-
-
-
-
-
+
+
+
+
+
-
-
+
+
- {% endblock %} {% block body %} Redirecting you to the video in a moment. Or click here. {% endblock %}
\ No newline at end of file
+ {% endblock %} {% block body %} Redirecting you to the video in a moment. Or click here. {% endblock %}
\ No newline at end of file
diff --git a/templates/redirect_script.html b/templates/redirect_script.html
deleted file mode 100644
index e136a64..0000000
--- a/templates/redirect_script.html
+++ /dev/null
@@ -1,28 +0,0 @@
-
\ No newline at end of file
diff --git a/templates/text.html b/templates/text.html
index c2e49d7..9edf2fb 100644
--- a/templates/text.html
+++ b/templates/text.html
@@ -1,30 +1,9 @@
{% extends 'base.html' %}
-
{% block head %}
-
-
-
-
+{% include 'tweetCommon.html' %}
diff --git a/templates/tweetCommon.html b/templates/tweetCommon.html
new file mode 100644
index 0000000..c928956
--- /dev/null
+++ b/templates/tweetCommon.html
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/templates/video.html b/templates/video.html
index 2fdb4c5..cb63be4 100644
--- a/templates/video.html
+++ b/templates/video.html
@@ -1,10 +1,8 @@
{% extends 'base.html' %} {% block head %}
-
-
-
+
-
+{% include 'tweetCommon.html' %}
@@ -17,7 +15,6 @@
-
diff --git a/test_vx.py b/test_vx.py
index 4ab95e6..81dc1cd 100644
--- a/test_vx.py
+++ b/test_vx.py
@@ -48,11 +48,11 @@ def compareDict(original,compare):
compareDict(original[key],compare[key])
## Specific API tests ##
-def test_syndicationAPI():
+def test_twextract_syndicationAPI():
tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description']
-def test_extractStatusV2Anon():
+def test_twextract_extractStatusV2Anon():
tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None)
assert tweet["full_text"]==testTextTweet_compare['description']
tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None)
@@ -63,40 +63,40 @@ def test_extractStatusV2Anon():
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94]
-def test_v2API():
+def test_twextract_v2API():
tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description']
## Tweet retrieve tests ##
-def test_textTweetExtract():
+def test_twextract_textTweetExtract():
tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testTextTweet_compare['description']
assert tweet["user"]["screen_name"]=="jack"
assert 'extended_entities' not in tweet
-def test_extractV2(): # remove this when v2 is default
+def test_twextract_extractV2(): # remove this when v2 is default
tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens)
-def test_UserExtract():
+def test_twextract_UserExtract():
user = twExtract.extractUser(testUser,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
-def test_UserExtractID():
+def test_twextract_UserExtractID():
user = twExtract.extractUser(testUserID,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
-def test_UserExtractWeirdURLs():
+def test_twextract_UserExtractWeirdURLs():
for url in testUserWeirdURLs:
user = twExtract.extractUser(url,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
-def test_videoTweetExtract():
+def test_twextract_videoTweetExtract():
tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testVideoTweet_compare['description']
assert 'extended_entities' in tweet
@@ -106,7 +106,7 @@ def test_videoTweetExtract():
assert video["type"]=="video"
-def test_mediaTweetExtract():
+def test_twextract_mediaTweetExtract():
tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description']
assert 'extended_entities' in tweet
@@ -116,7 +116,7 @@ def test_mediaTweetExtract():
assert video["type"]=="photo"
-def test_multimediaTweetExtract():
+def test_twextract_multimediaTweetExtract():
tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94]
assert 'extended_entities' in tweet
@@ -128,12 +128,12 @@ def test_multimediaTweetExtract():
assert video["media_url_https"]==testMultiMediaTweet_compare["images"][1]
assert video["type"]=="photo"
-def test_pollTweetExtract():
+def test_twextract_pollTweetExtract():
tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens)
assert 'card' in tweet
compareDict(testPoll_comparePoll,tweet['card'])
-def test_NSFW_TweetExtract():
+def test_twextract_NSFW_TweetExtract():
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error
## VNF conversion test ##
diff --git a/twitfix.py b/twitfix.py
index 94e3785..ad9d7c1 100644
--- a/twitfix.py
+++ b/twitfix.py
@@ -3,10 +3,7 @@ from flask import Flask, render_template, request, redirect, abort, Response, se
from flask_cors import CORS
import re
import os
-import urllib.parse
-import urllib.request
import combineImg
-from datetime import date,datetime, timedelta
from io import BytesIO, StringIO
import msgs
import twExtract as twExtract
@@ -14,7 +11,6 @@ from configHandler import config
from cache import addVnfToLinkCache,getVnfFromLinkCache
from yt_dlp.utils import ExtractorError
import vxlogging as log
-import zipfile
from vxApi import getApiResponse
from urllib.parse import urlparse
@@ -54,18 +50,34 @@ def getTweetIdFromUrl(url):
else:
return None
+def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
+ qrt = None
+ pollData = None
+ embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData))
+ return render_template("image.html",
+ tweet=tweetData,
+ pic=[image],
+ host=config['config']['url'],
+ desc=embedDesc,
+ tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
+ appname=config['config']['appname']+appnameSuffix,
+ )
+
+def renderVideoTweetEmbed(tweetData,video,appnameSuffix=""):
+ # TODO: render video tweet embed template
+ return "Video tweet embed"
+
+def renderTextTweetEmbed(tweetData,appnameSuffix=""):
+ # TODO: render text tweet embed template
+ return "Text tweet embed"
+
@app.route('/robots.txt')
def robots():
return "User-agent: *\nDisallow: /"
@app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly
def default():
- global user_agent
- user_agent = request.headers.get('user-agent')
- if isValidUserAgent(user_agent):
- return message("TwitFix is an attempt to fix twitter video embeds in discord! created by Robin Universe :)\n\nš\n\nClick me to be redirected to the repo!")
- else:
- return redirect(config['config']['repo'], 301)
+ return redirect(config['config']['repo'], 301)
@app.route('/oembed.json') #oEmbed endpoint
def oembedend():
@@ -76,157 +88,75 @@ def oembedend():
provName = request.args.get("provider",None)
return oEmbedGen(desc, user, link, ttype,providerName=provName)
+def getTweetData(twitter_url):
+ try:
+ rawTweetData = twExtract.extractStatusV2Anon(twitter_url)
+ except:
+ rawTweetData = None
+ if rawTweetData is None:
+ rawTweetData = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
+ if 'error' in rawTweetData:
+ return None
+
+ if rawTweetData is None:
+ return None
+ tweetData = getApiResponse(rawTweetData)
+ return tweetData
+
@app.route('/') # Default endpoint used by everything
def twitfix(sub_path):
- global user_agent
- user_agent = request.headers.get('user-agent')
match = pathregex.search(sub_path)
+ if match is None:
+ abort(404)
+ twitter_url = f'https://twitter.com/i/status/{getTweetIdFromUrl(sub_path)}'
- if request.url.endswith(".mp4") or request.url.endswith("%2Emp4"):
- twitter_url = "https://twitter.com/" + sub_path
-
- if "?" not in request.url:
- clean = twitter_url[:-4]
+ tweetData = getTweetData(twitter_url)
+ if tweetData is None:
+ abort(404)
+ qrt = None
+ if 'qrtURL' in tweetData:
+ qrt = getTweetData(tweetData['qrtURL'])
+
+ ###return tweetData
+
+ embedIndex = -1
+ # if url ends with /1, /2, /3, or /4, we'll use that as the index
+ if sub_path[-2:] in ["/1","/2","/3","/4"]:
+ embedIndex = int(sub_path[-1])-1
+ sub_path = sub_path[:-2]
+ if request.url.startswith("https://api.vx"): # Directly return the API response if the request is from the API
+ return tweetData
+ elif request.url.startswith("https://d.vx"): # direct embed
+ # determine what type of media we're dealing with
+ if not tweetData['hasMedia'] and qrt is None:
+ return renderTextTweetEmbed(tweetData)
+ elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
+ return redirect(tweetData['combinedMediaUrl'], 302)
else:
- clean = twitter_url
-
- vnf,e = vnfFromCacheOrDL(clean)
- if vnf is None:
- if e is not None:
- return message(msgs.failedToScan+msgs.failedToScanExtra+e)
- return message(msgs.failedToScan)
- return make_cached_vnf_response(vnf,getTemplate("rawvideo.html",vnf,"",[],clean,"","","",""))
- elif request.url.endswith(".txt") or request.url.endswith("%2Etxt"):
- twitter_url = "https://twitter.com/" + sub_path
-
- if "?" not in request.url:
- clean = twitter_url[:-4]
+ # this means we have mixed media or video, and we're only going to embed one
+ if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
+ embedIndex = 0
+ media = tweetData['media_extended'][embedIndex]
+ if media['type'] == "image":
+ return redirect(media['url'], 302)
+ elif media['type'] == "video" or media['type'] == "animated_gif":
+ return redirect(media['url'], 302) # TODO: might not work
+ else: # full embed
+ if not tweetData['hasMedia']:
+ return renderTextTweetEmbed(tweetData)
+ elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
+ return renderImageTweetEmbed(tweetData,tweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality")
else:
- clean = twitter_url
-
- vnf,e = vnfFromCacheOrDL(clean)
- if vnf is None:
- if e is not None:
- return abort(500,"Failed to scan tweet: "+e)
- return abort(500,"Failed to scan tweet")
- return make_content_type_response(getTemplate("txt.html",vnf,vnf["description"],[],clean,"","","",""),"text/plain")
- elif request.url.endswith(".zip") or request.url.endswith("%2Ezip"): # for certain types of archival software (i.e Hydrus)
- twitter_url = "https://twitter.com/" + sub_path
-
- if "?" not in request.url:
- clean = twitter_url[:-4]
- else:
- clean = twitter_url
-
- vnf,e = vnfFromCacheOrDL(clean)
- if vnf is None:
- if e is not None:
- return abort(500,"Failed to scan tweet: "+e)
- return abort(500,"Failed to scan tweet")
- with app.app_context():
- txtData = getTemplate("txt.html",vnf,vnf["description"],"",clean,"","","","")
- txtIo = BytesIO()
- txtIo.write(txtData.encode("utf-8"))
- txtIo.seek(0)
- zipIo = BytesIO()
- with zipfile.ZipFile(zipIo, 'w', zipfile.ZIP_DEFLATED) as zipf:
- zipf.writestr("tweetInfo.txt", txtIo.read())
- # todo: add images to zip
- zipIo.seek(0)
- return make_content_type_response(zipIo,"application/zip")
- elif request.url.startswith("https://d.vx"): # Matches d.fx? Try to give the user a direct link
- if isValidUserAgent(user_agent):
- twitter_url = config['config']['url'] + "/"+sub_path
- log.debug( "d.vx link shown to discord user-agent!")
- if request.url.endswith(".mp4") and "?" not in request.url:
- if "?" not in request.url:
- clean = twitter_url[:-4]
- else:
- clean = twitter_url
- else:
- clean = twitter_url
- return redirect(clean+".mp4", 301)
- else:
- log.debug("Redirect to MP4 using d.fxtwitter.com")
- return dir(sub_path)
- elif request.url.endswith("/1") or request.url.endswith("/2") or request.url.endswith("/3") or request.url.endswith("/4") or request.url.endswith("%2F1") or request.url.endswith("%2F2") or request.url.endswith("%2F3") or request.url.endswith("%2F4"):
- twitter_url = "https://twitter.com/" + sub_path
-
- if "?" not in request.url:
- clean = twitter_url[:-2]
- else:
- clean = twitter_url
+ # this means we have mixed media or video, and we're only going to embed one
+ if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
+ embedIndex = 0
+ media = tweetData['media_extended'][embedIndex]
+ if media['type'] == "image":
+ return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=f' - Media {embedIndex+1}/{len(tweetData["media_extended"])}')
+ elif media['type'] == "video" or media['type'] == "animated_gif":
+ return renderVideoTweetEmbed(tweetData,media['url'])
- image = ( int(request.url[-1]) - 1 )
- return embed_video(clean, image)
- elif request.url.startswith("https://api.vx"):
- twitter_url = "https://twitter.com/" + sub_path
- try:
- try:
- tweet = twExtract.extractStatusV2Anon(twitter_url)
- except:
- tweet = None
- if tweet is None:
- tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
- if tweet is None:
- log.error("API Get failed: " + twitter_url + " (Tweet null)")
- abort(500, '{"message": "Failed to extract tweet (Twitter API error)"}')
- if 'error' in tweet:
- response = make_response(jsonify(tweet), 500)
- response.headers['Content-Type'] = 'application/json'
- response.cache_control.max_age = 3600
- response.cache_control.public = True
- return response
- log.success("API Get success")
- return getApiResponse(tweet)
- except Exception as e:
- log.error("API Get failed: " + twitter_url + " " + log.get_exception_traceback_str(e))
- abort(500, '{"message": "Failed to extract tweet (Processing error)"}')
-
- if match is not None:
- twitter_url = sub_path
-
- if match.start() == 0:
- twitter_url = "https://twitter.com/" + sub_path
- else:
- # URL normalization messes up the URL, so we have to fix it
- if sub_path.startswith("https:/") and not sub_path.startswith("https://"):
- twitter_url = sub_path.replace("https:/", "https://", 1)
- elif sub_path.startswith("http:/") and not sub_path.startswith("http://"):
- twitter_url = sub_path.replace("http:/", "http://", 1)
-
- if isValidUserAgent(user_agent):
- res = embedCombined(twitter_url)
- return res
-
- else:
- log.debug("Redirect to " + twitter_url)
- return redirect(twitter_url, 301)
- else:
- return message("This doesn't appear to be a twitter URL")
-
-
-@app.route('/dir/') # Try to return a direct link to the MP4 on twitters servers
-def dir(sub_path):
- global user_agent
- user_agent = request.headers.get('user-agent')
- url = sub_path
- match = pathregex.search(url)
- if match is not None:
- twitter_url = url
-
- if match.start() == 0:
- twitter_url = "https://twitter.com/" + url
-
- if isValidUserAgent(user_agent):
- res = embed_video(twitter_url)
- return res
-
- else:
- log.debug("Redirect to direct MP4 URL")
- return direct_video(twitter_url)
- else:
- return redirect(url, 301)
+ return tweetData
@app.route('/favicon.ico')
def favicon(): # pragma: no cover
@@ -261,444 +191,6 @@ def rendercombined():
imgIo.seek(0)
return send_file(imgIo, mimetype='image/jpeg',max_age=86400)
-def upgradeVNF(vnf):
- # Makes sure any VNF object passed through this has proper fields if they're added in later versions
- if 'verified' not in vnf:
- vnf['verified']=False
- if 'size' not in vnf:
- if vnf['type'] == 'Video':
- vnf['size']={'width':720,'height':480}
- else:
- vnf['size']={}
- if 'qrtURL' not in vnf:
- if vnf['qrt'] == {}:
- vnf['qrtURL'] = None
- else: #
- vnf['qrtURL'] = f"https://twitter.com/{vnf['qrt']['screen_name']}/status/{vnf['qrt']['id']}"
- if 'isGif' not in vnf:
- vnf['isGif'] = False
- return vnf
-
-def getDefaultTTL(): # TTL for deleting items from the database
- return datetime.today().replace(microsecond=0) + timedelta(days=1)
-
-def secondsUntilTTL(ttl):
- untilTTL = ttl - datetime.today().replace(microsecond=0)
- return untilTTL.total_seconds()
-
-def make_content_type_response(response, content_type):
- resp = make_response(response)
- resp.headers['Content-Type'] = content_type
- return resp
-
-def make_cached_vnf_response(vnf,response):
- return response
- try:
- if 'ttl' not in vnf or vnf['ttl'] == None or secondsUntilTTL(vnf['ttl']) <= 0:
- return response
- resp = make_response(response)
- resp.cache_control.max_age = secondsUntilTTL(vnf['ttl'])
- resp.cache_control.public = True
- return resp
- except Exception as e:
- log.error("Error making cached response: " + str(e))
- return response
-
-def vnfFromCacheOrDL(video_link):
- cached_vnf = getVnfFromLinkCache(video_link)
- if cached_vnf == None:
- try:
- vnf = link_to_vnf(video_link)
- addVnfToLinkCache(video_link, vnf)
- log.success("VNF Get success")
- return vnf,None
- except (ExtractorError, twExtract.TwExtractError) as exErr:
- if 'HTTP Error 404' in exErr.msg or 'No status found with that ID' in exErr.msg:
- exErr.msg=msgs.tweetNotFound
- elif 'suspended' in exErr.msg:
- exErr.msg=msgs.tweetSuspended
- else:
- exErr.msg=msgs.unknownError
-
- log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(exErr))
- return None,exErr.msg
- except Exception as e:
- log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(e))
- return None,None
- else:
- return upgradeVNF(cached_vnf),None
-
-def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link
- vnf,e = vnfFromCacheOrDL(video_link)
- if vnf != None:
- return redirect(vnf['url'], 301)
- else:
- if e is not None:
- return message(msgs.failedToScan+msgs.failedToScanExtra+e)
- return message(msgs.failedToScan)
-
-def direct_video_link(video_link): # Just get a redirect to a MP4 link from any tweet link
- vnf,e = vnfFromCacheOrDL(video_link)
- if vnf != None:
- return vnf['url']
- else:
- if e is not None:
- return message(msgs.failedToScan+msgs.failedToScanExtra+e)
- return message(msgs.failedToScan)
-
-def embed_video(video_link, image=0): # Return Embed from any tweet link
- vnf,e = vnfFromCacheOrDL(video_link)
-
- if vnf != None:
- return embed(video_link, vnf, image)
- else:
- if e is not None:
- return message(msgs.failedToScan+msgs.failedToScanExtra+e)
- return message(msgs.failedToScan)
-
-def tweetInfo(url, tweet="", desc="", thumb="", uploader="", screen_name="", pfp="", tweetType="", images="", hits=0, likes=0, rts=0, time="", qrtURL="", nsfw=False,ttl=None,verified=False,size={},poll=None,isGif=False): # Return a dict of video info with default values
- if (ttl==None):
- ttl = getDefaultTTL()
- vnf = {
- "tweet" : tweet,
- "url" : url,
- "description" : desc,
- "thumbnail" : thumb,
- "uploader" : uploader,
- "screen_name" : screen_name,
- "pfp" : pfp,
- "type" : tweetType,
- "images" : images,
- "hits" : hits,
- "likes" : likes,
- "rts" : rts,
- "time" : time,
- "qrtURL" : qrtURL,
- "nsfw" : nsfw,
- "ttl" : ttl,
- "verified" : verified,
- "size" : size,
- "poll" : poll,
- "isGif" : isGif,
- "tweetId" : int(getTweetIdFromUrl(tweet))
- }
- if (poll is None):
- del vnf['poll']
- return vnf
-
-def link_to_vnf_from_tweet_data(tweet,video_link):
- imgs = ["","","","", ""]
- log.debug("Tweet Type: " + tweetType(tweet))
- isGif=False
- # Check to see if tweet has a video, if not, make the url passed to the VNF the first t.co link in the tweet
- if tweetType(tweet) == "Video":
- media=tweet['extended_entities']['media'][0]
- if media['video_info']['variants']:
- best_bitrate = -1
- thumb = media['media_url']
- if 'original_info' in media:
- size=media["original_info"]
- elif 'sizes' in media and ('large' in media["sizes"] or 'medium' in media["sizes"] or 'small' in media["sizes"] or 'thumb' in media["sizes"]):
- possibleSizes=['large','medium','small','thumb']
- for p in possibleSizes:
- if p in media["sizes"]:
- size={'width':media["sizes"][p]['w'],'height':media["sizes"][p]['h']}
- break
- else:
- size={'width':720,'height':480}
- for video in media['video_info']['variants']:
- if video['content_type'] == "video/mp4" and '/hevc/' not in video["url"] and video['bitrate'] > best_bitrate:
- url = video['url']
- best_bitrate = video['bitrate']
- elif tweetType(tweet) == "Text":
- url = ""
- thumb = ""
- size = {}
- else:
- imgs = ["","","","", ""]
- i = 0
- for media in tweet['extended_entities']['media']:
- imgs[i] = media['media_url_https']
- i = i + 1
-
- imgs[4] = str(i)
- url = ""
- images= imgs
- thumb = tweet['extended_entities']['media'][0]['media_url_https']
- size = {}
-
- if 'extended_entities' in tweet and 'media' in tweet['extended_entities'] and tweet['extended_entities']['media'][0]['type'] == 'animated_gif':
- isGif=True
-
- qrtURL = None
- if 'quoted_status_permalink' in tweet:
- qrtURL = tweet['quoted_status_permalink']['expanded']
- elif 'quoted_status_id_str' in tweet:
- qrtURL = "https://twitter.com/i/status/" + tweet['quoted_status_id_str']
-
- text = tweet['full_text']
-
- if 'possibly_sensitive' in tweet:
- nsfw = tweet['possibly_sensitive']
- else:
- nsfw = False
-
- if 'entities' in tweet and 'urls' in tweet['entities']:
- for eurl in tweet['entities']['urls']:
- if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"):
- text = text.replace(eurl["url"], "")
- else:
- text = text.replace(eurl["url"],eurl["expanded_url"])
- ttl = None #default
-
- try:
- if 'card' in tweet and tweet['card']['name'].startswith('poll'):
- poll=getPollObject(tweet['card'])
- if tweet['card']['binding_values']['counts_are_final']['boolean_value'] == False:
- ttl = datetime.today().replace(microsecond=0) + timedelta(minutes=1)
- else:
- poll=None
- except:
- poll=None
-
- vnf = tweetInfo(
- url,
- video_link,
- text, thumb,
- tweet['user']['name'],
- tweet['user']['screen_name'],
- tweet['user']['profile_image_url'],
- tweetType(tweet),
- likes=tweet['favorite_count'],
- rts=tweet['retweet_count'],
- time=tweet['created_at'],
- qrtURL=qrtURL,
- images=imgs,
- nsfw=nsfw,
- verified=tweet['user']['verified'],
- size=size,
- poll=poll,
- ttl=ttl,
- isGif=isGif
- )
-
- return vnf
-
-
-def link_to_vnf_from_unofficial_api(video_link):
- tweet=None
- log.info("Attempting to download tweet info: "+video_link)
- tweet = twExtract.extractStatus(video_link,workaroundTokens=config['config']['workaroundTokens'].split(','))
- log.success("Unofficial API Success")
-
- if "extended_entities" not in tweet:
- # check if any entities with urls ending in /video/XXX or /photo/XXX exist
- if "entities" in tweet and "urls" in tweet["entities"]:
- for url in tweet["entities"]["urls"]:
- if "/video/" in url["expanded_url"] or "/photo/" in url["expanded_url"]:
- log.info("Extra tweet info found in entities: "+video_link+" -> "+url["expanded_url"])
- subTweet = twExtract.extractStatus(url["expanded_url"],workaroundTokens=config['config']['workaroundTokens'].split(','))
- if "extended_entities" in subTweet:
- tweet["extended_entities"] = subTweet["extended_entities"]
- break
-
- return link_to_vnf_from_tweet_data(tweet,video_link)
-
-def link_to_vnf(video_link): # Return a VideoInfo object or die trying
- return link_to_vnf_from_unofficial_api(video_link)
-
-def message(text):
- return render_template(
- 'default.html',
- message = text,
- color = config['config']['color'],
- appname = config['config']['appname'],
- repo = config['config']['repo'],
- url = config['config']['url'] )
-
-def getTemplate(template,vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix="",embedVNF=None):
- if (embedVNF is None):
- embedVNF = vnf
- if ('width' in embedVNF['size'] and 'height' in embedVNF['size']):
- embedVNF['size']['width'] = min(embedVNF['size']['width'],2000)
- embedVNF['size']['height'] = min(embedVNF['size']['height'],2000)
- return render_template(
- template,
- likes = vnf['likes'],
- rts = vnf['rts'],
- time = vnf['time'],
- screenName = vnf['screen_name'],
- vidlink = embedVNF['url'],
- userLink = f"https://twitter.com/{vnf['screen_name']}",
- pfp = vnf['pfp'],
- vidurl = embedVNF['url'],
- desc = desc,
- pic = images,
- user = vnf['uploader'],
- video_link = vnf,
- color = color,
- appname = config['config']['appname'] + appNameSuffix,
- repo = config['config']['repo'],
- url = config['config']['url'],
- urlDesc = urlDesc,
- urlUser = urlUser,
- urlLink = urlLink,
- urlUserLink= urllib.parse.quote(f"https://twitter.com/{vnf['screen_name']}"),
- tweetLink = vnf['tweet'],
- videoSize = embedVNF['size'] )
-
-def embed(video_link, vnf, image):
- log.info("Embedding " + vnf['type'] + ": " + video_link)
-
- desc = re.sub(r' https:\/\/t\.co\/\S+(?=\s|$)', '', vnf['description'])
- urlUser = urllib.parse.quote(vnf['uploader'])
- urlDesc = urllib.parse.quote(desc)
- urlLink = urllib.parse.quote(video_link)
- likeDisplay = msgs.genLikesDisplay(vnf)
-
- if 'poll' in vnf:
- pollDisplay= msgs.genPollDisplay(vnf['poll'])
- else:
- pollDisplay=""
-
- qrt=None
- if vnf['qrtURL'] is not None:
- qrt,e=vnfFromCacheOrDL(vnf['qrtURL'])
- if qrt is not None:
- desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay)
- else:
- desc=msgs.formatEmbedDesc(vnf['type'],desc,None,pollDisplay,likeDisplay)
- embedVNF=None
- appNamePost = ""
- if vnf['type'] == "Text": # Change the template based on tweet type
- template = 'text.html'
- if qrt is not None and qrt['type'] != "Text":
- embedVNF=qrt
- if qrt['type'] == "Image":
- if embedVNF['images'][4]!="1":
- appNamePost = " - Image " + str(image+1) + " of " + str(vnf['images'][4])
- image = embedVNF['images'][image]
- template = 'image.html'
- elif qrt['type'] == "Video" or qrt['type'] == "":
- urlDesc = urllib.parse.quote(desc)
- template = 'video.html'
-
- if vnf['type'] == "Image":
- if vnf['images'][4]!="1":
- appNamePost = " - Image " + str(image+1) + "/" + str(vnf['images'][4])
- image = vnf['images'][image]
- template = 'image.html'
-
- if vnf['type'] == "Video":
- if vnf['isGif'] == True and config['config']['gifConvertAPI'] != "" and config['config']['gifConvertAPI'] != "none":
- vnf['url'] = f"{config['config']['gifConvertAPI']}/convert.mp4?url={vnf['url']}"
- appNamePost = " - GIF"
- urlDesc = urllib.parse.quote(desc)
- template = 'video.html'
-
- if vnf['type'] == "":
- urlDesc = urllib.parse.quote(desc)
- template = 'video.html'
-
- color = "#7FFFD4" # Green
-
- if vnf['nsfw'] == True:
- color = "#800020" # Red
-
- return make_cached_vnf_response(vnf,getTemplate(template,vnf,desc,[image],video_link,color,urlDesc,urlUser,urlLink,appNamePost,embedVNF))
-
-
-def embedCombined(video_link):
- vnf,e = vnfFromCacheOrDL(video_link)
-
- if vnf != None:
- return make_cached_vnf_response(vnf,embedCombinedVnf(video_link, vnf))
- else:
- if e is not None:
- return message(msgs.failedToScan+msgs.failedToScanExtra+e)
- return message(msgs.failedToScan)
-
-def embedCombinedVnf(video_link,vnf):
- qrt=None
- if vnf['qrtURL'] is not None:
- qrt,e=vnfFromCacheOrDL(vnf['qrtURL'])
-
- if vnf['type'] != "Image" and vnf['type'] != "Video" and qrt is not None and qrt['type'] == "Image":
- if qrt['images'][4]!="1":
- vnf['images'] = qrt['images']
- vnf['type'] = "Image"
-
- if vnf['type'] != "Image" or vnf['images'][4] == "1":
- return embed(video_link, vnf, 0)
- desc = re.sub(r' http.*t\.co\S+', '', vnf['description'])
- urlUser = urllib.parse.quote(vnf['uploader'])
- urlDesc = urllib.parse.quote(desc)
- urlLink = urllib.parse.quote(video_link)
- likeDisplay = msgs.genLikesDisplay(vnf)
-
- if 'poll' in vnf:
- pollDisplay= msgs.genPollDisplay(vnf['poll'])
- else:
- pollDisplay=""
-
-
- if qrt is not None:
- desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay)
-
- suffix=""
- #if 'Discord' in user_agent:
- # images = []
- # for i in range(0,int(vnf['images'][4])):
- # images.append(vnf['images'][i])
- #else:
- host = config['config']['url']
- image = f"{host}/rendercombined.jpg?imgs="
- for i in range(0,int(vnf['images'][4])):
- image = image + vnf['images'][i] + ","
- image = image[:-1] # Remove last comma
- images=[image]
- suffix=" - View original tweet for full quality"
-
- color = "#7FFFD4" # Green
-
- if vnf['nsfw'] == True:
- color = "#800020" # Red
- return make_cached_vnf_response(vnf,getTemplate('image.html',vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix=suffix))
-
-
-def getPollObject(card):
- poll={"total_votes":0,"choices":[]}
- choiceCount=0
- if (card["name"]=="poll2choice_text_only"):
- choiceCount=2
- elif (card["name"]=="poll3choice_text_only"):
- choiceCount=3
- elif (card["name"]=="poll4choice_text_only"):
- choiceCount=4
-
- for i in range(0,choiceCount):
- choice = {"text":card["binding_values"][f"choice{i+1}_label"]["string_value"],"votes":int(card["binding_values"][f"choice{i+1}_count"]["string_value"])}
- poll["total_votes"]+=choice["votes"]
- poll["choices"].append(choice)
- # update each choice with a percentage
- for choice in poll["choices"]:
- choice["percent"] = round((choice["votes"]/poll["total_votes"])*100,1)
-
- return poll
-
-
-def tweetType(tweet): # Are we dealing with a Video, Image, or Text tweet?
- if 'extended_entities' in tweet:
- if 'video_info' in tweet['extended_entities']['media'][0]:
- out = "Video"
- else:
- out = "Image"
- else:
- out = "Text"
-
- return out
-
-
def oEmbedGen(description, user, video_link, ttype,providerName=None):
if providerName == None:
providerName = config['config']['appname']
diff --git a/vxApi.py b/vxApi.py
index 9e1bc97..893cbf3 100644
--- a/vxApi.py
+++ b/vxApi.py
@@ -90,6 +90,24 @@ def getApiResponse(tweet,include_txt=False,include_zip=False):
else:
twText = twText.replace(eurl["url"],eurl["expanded_url"])
+ # check if all extended media are the same type
+ sameMedia = False
+ if len(media_extended) > 1:
+ sameMedia = True
+ for i in media_extended:
+ if i["type"] != media_extended[0]["type"]:
+ sameMedia = False
+ break
+
+ combinedMediaUrl = None
+ if sameMedia and media_extended[0]["type"] == "image":
+ host=config['config']['url']
+ combinedMediaUrl = f'{host}/rendercombined.jpg?imgs='
+ for i in media:
+ combinedMediaUrl += i + ","
+ combinedMediaUrl = combinedMediaUrl[:-1]
+
+
apiObject = {
"text": twText,
"likes": tweetL["favorite_count"],
@@ -107,7 +125,10 @@ def getApiResponse(tweet,include_txt=False,include_zip=False):
"possibly_sensitive": tweetL["possibly_sensitive"],
"hashtags": hashtags,
"qrtURL": qrtURL,
- "communityNote": communityNote
+ "communityNote": communityNote,
+ "allSameType": sameMedia,
+ "hasMedia": len(media) > 0,
+ "combinedMediaUrl": combinedMediaUrl
}
try:
apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp())