More work on VX refactor

This commit is contained in:
Dylan 2024-05-02 17:28:44 +01:00
parent ffcc96c13e
commit 88b2461af3
8 changed files with 171 additions and 125 deletions

View File

@ -5,6 +5,7 @@ import json
import os import os
import boto3 import boto3
import vxlogging as log import vxlogging as log
from utils import getTweetIdFromUrl
link_cache_system = config['config']['link_cache'] link_cache_system = config['config']['link_cache']
link_cache = {} link_cache = {}
@ -44,8 +45,7 @@ def serializeUnknown(obj):
return obj.isoformat() return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj)) raise TypeError ("Type %s not serializable" % type(obj))
def addVnfToLinkCache(video_link, vnf): def addVnfToTweetIdCache(tweet_id, vnf):
video_link = video_link.lower()
global link_cache global link_cache
try: try:
if link_cache_system == "db": if link_cache_system == "db":
@ -53,20 +53,20 @@ def addVnfToLinkCache(video_link, vnf):
log.debug("Link added to DB cache ") log.debug("Link added to DB cache ")
return True return True
elif link_cache_system == "json": elif link_cache_system == "json":
link_cache[video_link] = vnf link_cache[tweet_id] = vnf
with open("links.json", "w") as outfile: with open("links.json", "w") as outfile:
json.dump(link_cache, outfile, indent=4, sort_keys=True, default=serializeUnknown) json.dump(link_cache, outfile, indent=4, sort_keys=True, default=serializeUnknown)
log.debug("Link added to JSON cache ") log.debug("Link added to JSON cache ")
return True return True
elif link_cache_system == "ram": # FOR TESTS ONLY elif link_cache_system == "ram": # FOR TESTS ONLY
link_cache[video_link] = vnf link_cache[tweet_id] = vnf
log.debug("Link added to RAM cache ") log.debug("Link added to RAM cache ")
elif link_cache_system == "dynamodb": # pragma: no cover elif link_cache_system == "dynamodb": # pragma: no cover
vnf["ttl"] = int(vnf["ttl"].strftime('%s')) vnf["ttl"] = int(vnf["ttl"].strftime('%s'))
table = client.Table(DYNAMO_CACHE_TBL) table = client.Table(DYNAMO_CACHE_TBL)
table.put_item( table.put_item(
Item={ Item={
'tweet': video_link, 'tweet': tweet_id,
'vnf': vnf, 'vnf': vnf,
'ttl':vnf["ttl"] 'ttl':vnf["ttl"]
} }
@ -74,19 +74,21 @@ def addVnfToLinkCache(video_link, vnf):
log.debug("Link added to dynamodb cache ") log.debug("Link added to dynamodb cache ")
return True return True
except Exception as e: except Exception as e:
log.error("Failed to add link to DB cache: "+str(e)+" "+video_link) log.error("Failed to add link to DB cache: "+str(e)+" "+tweet_id)
return False return False
def getVnfFromLinkCache(video_link): def addVnfToLinkCache(twitter_url, vnf):
video_link = video_link.lower() return addVnfToTweetIdCache(getTweetIdFromUrl(twitter_url), vnf)
def getVnfFromTweetIdCache(tweet_id):
global link_cache global link_cache
if link_cache_system == "db": if link_cache_system == "db":
collection = db.linkCache collection = db.linkCache
vnf = collection.find_one({'tweet': video_link}) vnf = collection.find_one({'tweet': tweet_id})
if vnf != None: if vnf != None:
hits = ( vnf['hits'] + 1 ) hits = ( vnf['hits'] + 1 )
log.debug("Link located in DB cache.") log.debug("Link located in DB cache.")
query = { 'tweet': video_link } query = { 'tweet': tweet_id }
change = { "$set" : { "hits" : hits } } change = { "$set" : { "hits" : hits } }
out = db.linkCache.update_one(query, change) out = db.linkCache.update_one(query, change)
return vnf return vnf
@ -94,9 +96,9 @@ def getVnfFromLinkCache(video_link):
log.debug("Link not in DB cache") log.debug("Link not in DB cache")
return None return None
elif link_cache_system == "json": elif link_cache_system == "json":
if video_link in link_cache: if tweet_id in link_cache:
log.debug("Link located in json cache") log.debug("Link located in json cache")
vnf = link_cache[video_link] vnf = link_cache[tweet_id]
return vnf return vnf
else: else:
log.debug("Link not in json cache") log.debug("Link not in json cache")
@ -105,7 +107,7 @@ def getVnfFromLinkCache(video_link):
table = client.Table(DYNAMO_CACHE_TBL) table = client.Table(DYNAMO_CACHE_TBL)
response = table.get_item( response = table.get_item(
Key={ Key={
'tweet': video_link 'tweet': tweet_id
} }
) )
if 'Item' in response: if 'Item' in response:
@ -116,9 +118,9 @@ def getVnfFromLinkCache(video_link):
log.debug("Link not in dynamodb cache") log.debug("Link not in dynamodb cache")
return None return None
elif link_cache_system == "ram": # FOR TESTS ONLY elif link_cache_system == "ram": # FOR TESTS ONLY
if video_link in link_cache: if tweet_id in link_cache:
log.debug("Link located in json cache") log.debug("Link located in json cache")
vnf = link_cache[video_link] vnf = link_cache[tweet_id]
return vnf return vnf
else: else:
log.debug("Link not in cache") log.debug("Link not in cache")
@ -126,6 +128,9 @@ def getVnfFromLinkCache(video_link):
elif link_cache_system == "none": elif link_cache_system == "none":
return None return None
def getVnfFromLinkCache(twitter_url):
return getVnfFromTweetIdCache(getTweetIdFromUrl(twitter_url))
def clearCache(): def clearCache():
global link_cache global link_cache
# only intended for use in tests # only intended for use in tests

View File

@ -17,7 +17,7 @@ def genLikesDisplay(vnf):
def genQrtDisplay(qrt): def genQrtDisplay(qrt):
verifiedCheck = "☑️" if ('verified' in qrt and qrt['verified']) else "" verifiedCheck = "☑️" if ('verified' in qrt and qrt['verified']) else ""
return ("\n\n【QRT of " + qrt['uploader'] + " (@" + qrt['screen_name'] + ")"+ verifiedCheck+":】\n'" + qrt['description'] + "'") return ("\n\n【QRT of " + qrt['user_name'] + " (@" + qrt['user_screen_name'] + ")"+ verifiedCheck+":】\n'" + qrt['text'] + "'")
def genPollDisplay(poll): def genPollDisplay(poll):
pctSplit=10 pctSplit=10
@ -29,7 +29,12 @@ def genPollDisplay(poll):
def formatEmbedDesc(type,body,qrt,pollDisplay,likesDisplay): def formatEmbedDesc(type,body,qrt,pollDisplay,likesDisplay):
# Trim the embed description to 248 characters, prioritizing poll and likes # Trim the embed description to 248 characters, prioritizing poll and likes
limit = videoDescLimit if type=="" or type=="Video" or (qrt!=None and (qrt["type"]=="" or qrt["type"]=="Video")) else tweetDescLimit qrtType=None
#if qrt!=None:
# qrtType=qrt["type"]
qrtType="Text"
limit = videoDescLimit if type=="Text" or type=="Video" or (qrt!=None and (qrtType=="Text" or qrtType=="Video")) else tweetDescLimit
output = "" output = ""
if pollDisplay==None: if pollDisplay==None:

View File

@ -4,10 +4,10 @@
<meta name="twitter:card" content="tweet" /> <meta name="twitter:card" content="tweet" />
{% include 'tweetCommon.html' %} {% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ pic[0] }}" /> <meta name="twitter:image" content="{{ tweet['user_profile_image_url'] }}" />
<meta name="twitter:creator" content="@{{ user }}" /> <meta name="twitter:creator" content="@{{ user }}" />
<meta property="og:description" content="{{ desc }}" /> <meta property="og:description" content="{{ desc }}" />
<link rel="alternate" href="{{ url }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=link&provider={{ appname }}" type="application/json+oembed" title="{{ user }}"> <link rel="alternate" href="{{ host }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=link&provider={{ appname }}" type="application/json+oembed" title="{{ tweet['user_name'] }}">
<meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %} <meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %}

View File

@ -3,20 +3,20 @@
<meta name="twitter:card" content="player" /> <meta name="twitter:card" content="player" />
{% include 'tweetCommon.html' %} {% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ pic }}" /> <meta name="twitter:image" content="{{ media['thumbnail_url'] }}" />
<meta name="twitter:player:width" content="{{ videoSize['width'] }}" /> <meta name="twitter:player:width" content="{{ media['size']['width'] }}" />
<meta name="twitter:player:height" content="{{ videoSize['height'] }}" /> <meta name="twitter:player:height" content="{{ media['size']['height'] }}" />
<meta name="twitter:player:stream" content="{{ vidurl }}" /> <meta name="twitter:player:stream" content="{{ media['url'] }}" />
<meta name="twitter:player:stream:content_type" content="video/mp4" /> <meta name="twitter:player:stream:content_type" content="video/mp4" />
<meta property="og:url" content="{{ vidlink }}" /> <meta property="og:url" content="{{ vidlink }}" />
<meta property="og:video" content="{{ vidurl }}" /> <meta property="og:video" content="{{ media['url'] }}" />
<meta property="og:video:secure_url" content="{{ vidurl }}" /> <meta property="og:video:secure_url" content="{{ media['url'] }}" />
<meta property="og:video:type" content="video/mp4" /> <meta property="og:video:type" content="video/mp4" />
<meta property="og:video:width" content="{{ videoSize['width'] }}" /> <meta property="og:video:width" content="{{ media['size']['width'] }}" />
<meta property="og:video:height" content="{{ videoSize['height'] }}" /> <meta property="og:video:height" content="{{ media['size']['height'] }}" />
<meta property="og:image" content="{{ pic[0] }}" /> <meta property="og:image" content="{{ media['thumbnail_url'] }}" />
<meta property="og:description" content="{{ desc }}" /> <meta property="og:description" content="{{ desc }}" />
<link rel="alternate" href="{{ url }}/oembed.json?desc={{ urlUser }}&user={{ urlDesc }}&link={{ tweetLink }}&ttype=video&provider={{ appname }}" type="application/json+oembed" title="{{ user }}"> <link rel="alternate" href="{{ host }}/oembed.json?desc={{ urlUser }}&user={{ urlDesc }}&link={{ tweetLink }}&ttype=video&provider={{ appname }}" type="application/json+oembed" title="{{ tweet['user_name'] }}">
<meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %} <meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %}

View File

@ -1,8 +1,7 @@
import os import os
import twitfix,twExtract import twitfix,twExtract,vxApi
import cache import cache
import msgs
from flask.testing import FlaskClient from flask.testing import FlaskClient
client = FlaskClient(twitfix.app) client = FlaskClient(twitfix.app)
@ -16,15 +15,14 @@ testQrtCeptionTweet="https://twitter.com/CatherineShu/status/585253766271672320"
testQrtVideoTweet="https://twitter.com/pdxdylan/status/1674561759422578690" testQrtVideoTweet="https://twitter.com/pdxdylan/status/1674561759422578690"
testNSFWTweet="https://twitter.com/kuyacoy/status/1581185279376838657" testNSFWTweet="https://twitter.com/kuyacoy/status/1581185279376838657"
testTextTweet_compare={'tweet': 'https://twitter.com/jack/status/20', 'url': '', 'description': 'just setting up my twttr', 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2006-03-21T20:50:14.000Z', 'qrtURL': None, 'nsfw': False, 'size': {}, 'isGif': False} testTextTweet_compare={'text': 'just setting up my twttr', 'date': 'Tue Mar 21 20:50:14 +0000 2006', 'tweetURL': 'https://twitter.com/jack/status/20', 'tweetID': '20', 'conversationID': '20', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': False, 'hasMedia': False, 'combinedMediaUrl': None, 'date_epoch': 1142974214}
testVideoTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1540398733669666818', 'url': 'https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4?tag=12', 'description': 'TikTok embeds on Discord/Telegram bait you with a fake play button, but to see the actual video you have to go to their website.\nAs a request from a friend, I made it so that if you add "vx" before "tiktok" on any link, it fixes that. https://t.co/QYpiVXUIrW', 'thumbnail': 'https://pbs.twimg.com/ext_tw_video_thumb/1540396699037929472/pu/img/l187Z6B9AHHxUKPV.jpg', 'type': 'Video', 'images': ['', '', '', '', ''], 'time': '2022-06-24T18:17:31.000Z', 'qrtURL': None, 'nsfw': False, 'size': {'height': 528, 'width': 762, 'focus_rects': []}, 'isGif': False} testVideoTweet_compare={'text': 'TikTok embeds on Discord/Telegram bait you with a fake play button, but to see the actual video you have to go to their website.\nAs a request from a friend, I made it so that if you add "vx" before "tiktok" on any link, it fixes that. https://t.co/QYpiVXUIrW', 'date': 'Fri Jun 24 18:17:31 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1540398733669666818', 'tweetID': '1540398733669666818', 'conversationID': '1540398733669666818', 'mediaURLs': ['https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4?tag=12'], 'media_extended': [{'url': 'https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4?tag=12', 'type': 'video', 'size': {'width': 762, 'height': 528}, 'duration_millis': 13650, 'thumbnail_url': 'https://pbs.twimg.com/ext_tw_video_thumb/1540396699037929472/pu/img/l187Z6B9AHHxUKPV.jpg', 'altText': None}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': False, 'hasMedia': True, 'combinedMediaUrl': None, 'date_epoch': 1656094651}
testMediaTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1534672932106035200', 'url': '', 'description': 'oh. https://t.co/HgLAbiXw2E', 'thumbnail': 'https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png', 'type': 'Image', 'images': ['https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png', '', '', '', '1'], 'time': '2022-06-08T23:05:14.000Z', 'qrtURL': None, 'nsfw': False, 'size': {}, 'isGif': False} testMediaTweet_compare={'text': 'oh. https://t.co/HgLAbiXw2E', 'date': 'Wed Jun 08 23:05:14 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1534672932106035200', 'tweetID': '1534672932106035200', 'conversationID': '1534672673422381057', 'mediaURLs': ['https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png', 'altText': None, 'type': 'image', 'size': {'width': 927, 'height': 534}, 'thumbnail_url': 'https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png'}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': False, 'hasMedia': True, 'combinedMediaUrl': None, 'date_epoch': 1654729514}
testMultiMediaTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1532006436703715331', 'url': '', 'description': 'Released #Retro64 1.0.9. Besides a lot of internal bug-fixes, this adds quicksand blocks, fixes the rendering for the castle stairs block, and adds a new model, Sonic! \nhttps://github.com/Retro64Mod/Retro64Mod/releases/tag/1.18.2-1.0.9 https://t.co/CWZaw4hzyg', 'thumbnail': 'https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'type': 'Image', 'images': ['https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png', 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png', '', '3'], 'time': '2022-06-01T14:29:32.000Z', 'qrtURL': None, 'nsfw': False, 'size': {}, 'isGif': False} testMultiMediaTweet_compare={'text': 'Released #Retro64 1.0.9. Besides a lot of internal bug-fixes, this adds quicksand blocks, fixes the rendering for the castle stairs block, and adds a new model, Sonic! \nhttps://github.com/Retro64Mod/Retro64Mod/releases/tag/1.18.2-1.0.9 https://t.co/CWZaw4hzyg', 'date': 'Wed Jun 01 14:29:32 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1532006436703715331', 'tweetID': '1532006436703715331', 'conversationID': '1532006436703715331', 'mediaURLs': ['https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png', 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'altText': None, 'type': 'image', 'size': {'width': 507, 'height': 507}, 'thumbnail_url': 'https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png'}, {'url': 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png', 'altText': None, 'type': 'image', 'size': {'width': 396, 'height': 431}, 'thumbnail_url': 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png'}, {'url': 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png', 'altText': None, 'type': 'image', 'size': {'width': 399, 'height': 341}, 'thumbnail_url': 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png'}], 'possibly_sensitive': False, 'hashtags': ['Retro64'], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': 'https://vxtwitter.com/rendercombined.jpg?imgs=https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png,https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png,https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png', 'date_epoch': 1654093772}
testQRTTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1611477137319514129', 'url': '', 'description': "vxTwitter has gotten a *ton* of usage recently, so I'd appreciate a donation to keep things running!\n", 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2023-01-06T21:37:43.000Z', 'qrtURL': 'https://twitter.com/pdxdylan/status/1518309187515781125', 'nsfw': False, 'size': {}, 'isGif': False} testQRTTweet_compare={'text': "vxTwitter has gotten a *ton* of usage recently, so I'd appreciate a donation to keep things running!\n", 'date': 'Fri Jan 06 21:37:43 +0000 2023', 'tweetURL': 'https://twitter.com/pdxdylan/status/1611477137319514129', 'tweetID': '1611477137319514129', 'conversationID': '1611476665821003776', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': 'https://twitter.com/i/status/1518309187515781125', 'allSameType': False, 'hasMedia': False, 'combinedMediaUrl': None, 'date_epoch': 1673041063}
testQrtCeptionTweet_compare={'tweet': 'https://twitter.com/CatherineShu/status/585253766271672320', 'url': '', 'description': 'Testing retweetception ', 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2015-04-07T01:32:26.000Z', 'qrtURL': 'https://twitter.com/EliLanger/status/585253161260216320', 'nsfw': False, 'size': {}, 'isGif': False} testQrtCeptionTweet_compare={'text': 'Testing retweetception ', 'date': 'Tue Apr 07 01:32:26 +0000 2015', 'tweetURL': 'https://twitter.com/CatherineShu/status/585253766271672320', 'tweetID': '585253766271672320', 'conversationID': '585253766271672320', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': 'https://twitter.com/i/status/585253161260216320', 'allSameType': False, 'hasMedia': False, 'combinedMediaUrl': None, 'date_epoch': 1428370346}
testQrtVideoTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1674561759422578690', 'url': '', 'description': 'good', 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2023-06-29T23:33:29.000Z', 'qrtURL': 'https://twitter.com/TeaboyAllStars/status/1674197531301904388', 'nsfw': False, 'size': {}, 'isGif': False} testQrtVideoTweet_compare={'text': 'good', 'date': 'Thu Jun 29 23:33:29 +0000 2023', 'tweetURL': 'https://twitter.com/pdxdylan/status/1674561759422578690', 'tweetID': '1674561759422578690', 'conversationID': '1674561759422578690', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': 'https://twitter.com/i/status/1674197531301904388', 'allSameType': False, 'hasMedia': False, 'combinedMediaUrl': None, 'date_epoch': 1688081609}
testNSFWTweet_compare={'tweet': 'https://twitter.com/kuyacoy/status/1581185279376838657', 'url': '', 'description': "ngl, I'm scared on finding out the cute Sprigatito's final evolution..\n\nso i had a bot generate it for me.... and I'm forever scarred https://t.co/itMay87vcS", 'thumbnail': 'https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg', 'type': 'Image', 'images': ['https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg', '', '', '', '1'], 'time': 'Sat Oct 15 07:28:42 +0000 2022', 'qrtURL': None, 'nsfw': True, 'size': {}, 'isGif': False} testNSFWTweet_compare={'text': "ngl, I'm scared on finding out the cute Sprigatito's final evolution..\n\nso i had a bot generate it for me.... and I'm forever scarred https://t.co/itMay87vcS", 'date': 'Sat Oct 15 07:28:42 +0000 2022', 'tweetURL': 'https://twitter.com/kuyacoy/status/1581185279376838657', 'tweetID': '1581185279376838657', 'conversationID': '1581185279376838657', 'mediaURLs': ['https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg', 'altText': None, 'type': 'image', 'size': {'width': 760, 'height': 926}, 'thumbnail_url': 'https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg'}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': False, 'hasMedia': True, 'combinedMediaUrl': None, 'date_epoch': 1665818922}
testUser="https://twitter.com/jack" testUser="https://twitter.com/jack"
testUserID = "https://twitter.com/i/user/12" testUserID = "https://twitter.com/i/user/12"
@ -50,27 +48,27 @@ def compareDict(original,compare):
## Specific API tests ## ## Specific API tests ##
def test_twextract_syndicationAPI(): def test_twextract_syndicationAPI():
tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['text']
def test_twextract_extractStatusV2Anon(): def test_twextract_extractStatusV2Anon():
tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None) tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None)
assert tweet["full_text"]==testTextTweet_compare['description'] assert tweet["full_text"]==testTextTweet_compare['text']
tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None) tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None)
assert tweet["full_text"]==testVideoTweet_compare['description'] assert tweet["full_text"]==testVideoTweet_compare['text']
tweet = twExtract.extractStatusV2AnonLegacy(testMediaTweet,None) tweet = twExtract.extractStatusV2AnonLegacy(testMediaTweet,None)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['text']
tweet = twExtract.extractStatusV2AnonLegacy(testMultiMediaTweet,None) tweet = twExtract.extractStatusV2AnonLegacy(testMultiMediaTweet,None)
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94] assert tweet["full_text"][:94]==testMultiMediaTweet_compare['text'][:94]
def test_twextract_v2API(): def test_twextract_v2API():
tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['text']
## Tweet retrieve tests ## ## Tweet retrieve tests ##
def test_twextract_textTweetExtract(): def test_twextract_textTweetExtract():
tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testTextTweet_compare['description'] assert tweet["full_text"]==testTextTweet_compare['text']
assert tweet["user"]["screen_name"]=="jack" assert tweet["user"]["screen_name"]=="jack"
assert 'extended_entities' not in tweet assert 'extended_entities' not in tweet
@ -98,34 +96,35 @@ def test_twextract_UserExtractWeirdURLs():
def test_twextract_videoTweetExtract(): def test_twextract_videoTweetExtract():
tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testVideoTweet_compare['description'] assert tweet["full_text"]==testVideoTweet_compare['text']
assert 'extended_entities' in tweet assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==1 assert len(tweet['extended_entities']["media"])==1
video = tweet['extended_entities']["media"][0] video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testVideoTweet_compare['thumbnail'] assert video["media_url_https"]==testVideoTweet_compare['media_extended'][0]['thumbnail_url']
assert video["type"]=="video" assert video["type"]=="video"
def test_twextract_mediaTweetExtract(): def test_twextract_mediaTweetExtract():
tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['text']
assert 'extended_entities' in tweet assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==1 assert len(tweet['extended_entities']["media"])==1
video = tweet['extended_entities']["media"][0] video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testMediaTweet_compare['thumbnail']
assert video["media_url_https"]==testMediaTweet_compare['media_extended'][0]['thumbnail_url']
assert video["type"]=="photo" assert video["type"]=="photo"
def test_twextract_multimediaTweetExtract(): def test_twextract_multimediaTweetExtract():
tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94] assert tweet["full_text"][:94]==testMultiMediaTweet_compare['text'][:94]
assert 'extended_entities' in tweet assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==3 assert len(tweet['extended_entities']["media"])==3
video = tweet['extended_entities']["media"][0] video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testMultiMediaTweet_compare["images"][0] assert video["media_url_https"]==testMultiMediaTweet_compare["mediaURLs"][0]
assert video["type"]=="photo" assert video["type"]=="photo"
video = tweet['extended_entities']["media"][1] video = tweet['extended_entities']["media"][1]
assert video["media_url_https"]==testMultiMediaTweet_compare["images"][1] assert video["media_url_https"]==testMultiMediaTweet_compare["mediaURLs"][1]
assert video["type"]=="photo" assert video["type"]=="photo"
def test_twextract_pollTweetExtract(): def test_twextract_pollTweetExtract():
@ -136,26 +135,29 @@ def test_twextract_pollTweetExtract():
def test_twextract_NSFW_TweetExtract(): def test_twextract_NSFW_TweetExtract():
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error
def getVNFFromLink(link):
return twitfix.getTweetData(link)
## VNF conversion test ## ## VNF conversion test ##
def test_textTweetVNF(): def test_textTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testTextTweet) vnf = getVNFFromLink(testTextTweet)
compareDict(testTextTweet_compare,vnf) compareDict(testTextTweet_compare,vnf)
def test_videoTweetVNF(): def test_videoTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testVideoTweet) vnf = getVNFFromLink(testVideoTweet)
compareDict(testVideoTweet_compare,vnf) compareDict(testVideoTweet_compare,vnf)
def test_mediaTweetVNF(): def test_mediaTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testMediaTweet) vnf = getVNFFromLink(testMediaTweet)
compareDict(testMediaTweet_compare,vnf) compareDict(testMediaTweet_compare,vnf)
def test_multimediaTweetVNF(): def test_multimediaTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testMultiMediaTweet) vnf = getVNFFromLink(testMultiMediaTweet)
compareDict(testMultiMediaTweet_compare,vnf) compareDict(testMultiMediaTweet_compare,vnf)
def test_pollTweetVNF(): def test_pollTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testPollTweet) vnf = getVNFFromLink(testPollTweet)
compareDict(testPoll_comparePollVNF,vnf['poll']) compareDict(testPoll_comparePollVNF,vnf['poll'])
def test_qrtTweet(): def test_qrtTweet():
@ -163,7 +165,7 @@ def test_qrtTweet():
# this is an incredibly lazy test, todo: improve it in the future # this is an incredibly lazy test, todo: improve it in the future
resp = client.get(testQRTTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) resp = client.get(testQRTTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200 assert resp.status_code==200
assert testQRTTweet_compare['description'][:10] in str(resp.data) assert testQRTTweet_compare['text'][:10] in str(resp.data)
# test qrt-ception # test qrt-ception
resp = client.get(testQrtCeptionTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) # get top level tweet resp = client.get(testQrtCeptionTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) # get top level tweet
assert resp.status_code==200 assert resp.status_code==200
@ -181,16 +183,17 @@ def test_qrtVideoTweet():
# this is an incredibly lazy test, todo: improve it in the future # this is an incredibly lazy test, todo: improve it in the future
resp = client.get(testQrtVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) resp = client.get(testQrtVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200 assert resp.status_code==200
vurl = testQrtVideoTweet_compare["url"] qtd_tweet=cache.getVnfFromLinkCache("https://twitter.com/i/status/1674197531301904388")
vurl = qtd_tweet["mediaURLs"][0]
assert f"twitter:player:stream\" content=\"{vurl}" in str(resp.data) assert f"twitter:player:stream\" content=\"{vurl}" in str(resp.data)
## Test adding to cache ; cache should be empty ## ## Test adding to cache ; cache should be empty ##
def test_addToCache(): def test_addToCache():
cache.clearCache() cache.clearCache()
twitfix.vnfFromCacheOrDL(testTextTweet) twitfix.getTweetData(testTextTweet)
twitfix.vnfFromCacheOrDL(testVideoTweet) twitfix.getTweetData(testVideoTweet)
twitfix.vnfFromCacheOrDL(testMediaTweet) twitfix.getTweetData(testMediaTweet)
twitfix.vnfFromCacheOrDL(testMultiMediaTweet) twitfix.getTweetData(testMultiMediaTweet)
#retrieve #retrieve
compareDict(testTextTweet_compare,cache.getVnfFromLinkCache(testTextTweet)) compareDict(testTextTweet_compare,cache.getVnfFromLinkCache(testTextTweet))
compareDict(testVideoTweet_compare,cache.getVnfFromLinkCache(testVideoTweet)) compareDict(testVideoTweet_compare,cache.getVnfFromLinkCache(testVideoTweet))
@ -207,10 +210,10 @@ def test_embedFromScratch():
def test_embedFromCache(): def test_embedFromCache():
cache.clearCache() cache.clearCache()
twitfix.vnfFromCacheOrDL(testTextTweet) twitfix.getTweetData(testTextTweet)
twitfix.vnfFromCacheOrDL(testVideoTweet) twitfix.getTweetData(testVideoTweet)
twitfix.vnfFromCacheOrDL(testMediaTweet) twitfix.getTweetData(testMediaTweet)
twitfix.vnfFromCacheOrDL(testMultiMediaTweet) twitfix.getTweetData(testMultiMediaTweet)
#embed time #embed time
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200 assert resp.status_code==200
@ -235,32 +238,11 @@ def test_veryLongEmbed():
resp = client.get('https://twitter.com/TEST/status/1234'.replace("https://twitter.com",""),headers={"User-Agent":"test"}) resp = client.get('https://twitter.com/TEST/status/1234'.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200 assert resp.status_code==200
def test_embedFromOutdatedCache(): # presets a cache that has VNF's with missing fields; there's probably a better way to do this
cache.setCache({"https://twitter.com/Twitter/status/1118295916874739714":{"description":"On profile pages, we used to only show someones replies, not the original Tweet 🙄 Now were showing both so you can follow the conversation more easily! https://t.co/LSBEZYFqmY","hits":0,"images":["https://pbs.twimg.com/media/D4TS4xeX4AA02DI.jpg","","","","1"],"likes":5033,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg","qrt":{},"rts":754,"screen_name":'Twitter',"thumbnail":"https://pbs.twimg.com/media/D4TS4xeX4AA02DI.jpg","time":"Tue Apr 16 23:31:38 +0000 2019","tweet":"https://twitter.com/Twitter/status/1118295916874739714","type":"Image","uploader":'Twitter',"url":""},
"https://twitter.com/Twitter/status/1263145271946551300":{"description":"Testing, testing...\n\nA new way to have a convo with exactly who you want. Were starting with a small % globally, so keep your 👀 out to see it in action. https://t.co/pV53mvjAVT","hits":0,"images":["","","","",""],"likes":61584,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg","qrt":{},"rts":17138,"screen_name":'Twitter',"thumbnail":"https://pbs.twimg.com/media/EYeX7akWsAIP1_1.jpg","time":"Wed May 20 16:31:15 +0000 2020","tweet":"https://twitter.com/Twitter/status/1263145271946551300","type":"Video","uploader":'Twitter',"url":"https://video.twimg.com/amplify_video/1263145212760805376/vid/1280x720/9jous8HM0_duxL0w.mp4?tag=13"},
#"https://twitter.com/Twitter/status/1293239745695211520":{"description":"We tested, you Tweeted, and now were rolling it out to everyone! https://t.co/w6Q3Q6DiKz","hits":0,"images":["https://pbs.twimg.com/media/EfJ-C-JU0AAQL_C.jpg","https://pbs.twimg.com/media/EfJ-aHlU0AAU1kq.jpg","","","2"],"likes":5707,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg","qrt":{},"rts":1416,"screen_name":"Twitter","thumbnail":"https://pbs.twimg.com/media/EfJ-C-JU0AAQL_C.jpg","time":"Tue Aug 11 17:35:57 +0000 2020","tweet":"https://twitter.com/Twitter/status/1293239745695211520","type":"Image","uploader":"Twitter","url":""},
"https://twitter.com/jack/status/20":{"description":"just setting up my twttr","hits":0,"images":["","","","",""],"likes":179863,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1115644092329758721/AFjOr-K8_normal.jpg","qrt":{},"rts":122021,"screen_name":"jack","thumbnail":"","time":"Tue Mar 21 20:50:14 +0000 2006","tweet":"https://twitter.com/jack/status/20","type":"Text","uploader":"jack","url":""},
testQrtVideoTweet:{'tweet': 'https://twitter.com/Twitter/status/1494436688554344449', 'url': '', 'description': 'https://twitter.com/TwitterSupport/status/1494386367467593737', 'thumbnail': '', 'uploader': 'Twitter', 'screen_name': 'Twitter', 'pfp': 'https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg', 'type': 'Text', 'images': ['', '', '', '', ''], 'likes': 5186, 'rts': 703, 'time': 'Thu Feb 17 22:20:46 +0000 2022', 'qrt': {'desc': 'Keep your fave DM convos easily accessible by pinning them! You can now pin up to six conversations that will stay at the top of your DM inbox.\n\nAvailable on Android, iOS, and web. https://t.co/kIjlzf9XLJ', 'handle': 'Twitter Support', 'screen_name': 'TwitterSupport', 'verified': True, 'id': '1494386367467593737'}, 'nsfw': False, 'verified': True, 'size': {}}
})
#embed time
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
# qrt
resp = client.get(testQrtVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert "twitter:player:stream\" content=\"https://video.twimg.com/tweet_video/FL0gdK8WUAIHHKa.mp4" in str(resp.data)
def test_directEmbed(): def test_directEmbed():
resp = client.get(testVideoTweet.replace("https://twitter.com","")+".mp4",headers={"User-Agent":"test"}) resp = client.get(testVideoTweet.replace("https://twitter.com","")+".mp4",headers={"User-Agent":"test"})
assert resp.status_code==200 assert resp.status_code==200
assert testVideoTweet_compare["url"] in str(resp.data) assert testVideoTweet_compare["mediaURLs"][0] in str(resp.data)
def test_message404(): def test_message404():
resp = client.get("https://twitter.com/jack/status/12345",headers={"User-Agent":"test"}) resp = client.get("https://twitter.com/jack/status/12345",headers={"User-Agent":"test"})
@ -268,9 +250,9 @@ def test_message404():
assert "Failed to scan your link!" in str(resp.data) assert "Failed to scan your link!" in str(resp.data)
def test_combine(): def test_combine():
twt,e = twitfix.vnfFromCacheOrDL(testMultiMediaTweet) twt = twitfix.getTweetData(testMultiMediaTweet)
img1 = twt["images"][0] img1 = twt["mediaURLs"][0]
img2 = twt["images"][1] img2 = twt["mediaURLs"][1]
resp = client.get(f"/rendercombined.jpg?imgs={img1},{img2}",headers={"User-Agent":"test"}) resp = client.get(f"/rendercombined.jpg?imgs={img1},{img2}",headers={"User-Agent":"test"})
assert resp.status_code==200 assert resp.status_code==200
assert resp.headers["Content-Type"]=="image/jpeg" assert resp.headers["Content-Type"]=="image/jpeg"

View File

@ -11,20 +11,22 @@ tests = {
"testNSFWTweet":"https://twitter.com/kuyacoy/status/1581185279376838657" "testNSFWTweet":"https://twitter.com/kuyacoy/status/1581185279376838657"
} }
def getVNFFromLink(link):
return twitfix.getTweetData(link)
with open('generated.txt', 'w',encoding='utf-8') as f: with open('generated.txt', 'w',encoding='utf-8') as f:
f.write("# autogenerated from testgen.py\n") f.write("# autogenerated from testgen.py\n")
for test in tests: for test in tests:
f.write(f"{test}=\"{tests[test]}\"\n") f.write(f"{test}=\"{tests[test]}\"\n")
f.write("\n") f.write("\n")
for test in tests: for test in tests:
VNF = twitfix.link_to_vnf(tests[test]) VNF = getVNFFromLink(tests[test])
del VNF['ttl']
del VNF['likes'] del VNF['likes']
del VNF['rts'] del VNF['retweets']
del VNF['hits'] del VNF['replies']
del VNF['pfp'] del VNF['user_screen_name']
del VNF['uploader'] del VNF['user_name']
del VNF['verified'] del VNF['user_profile_image_url']
del VNF['screen_name'] del VNF['communityNote']
# write in a format that can be copy-pasted into a python file, i.e testTextTweet={... # write in a format that can be copy-pasted into a python file, i.e testTextTweet={...
f.write(f"{test}_compare={VNF}\n") f.write(f"{test}_compare={VNF}\n")

View File

@ -11,13 +11,13 @@ from configHandler import config
from cache import addVnfToLinkCache,getVnfFromLinkCache from cache import addVnfToLinkCache,getVnfFromLinkCache
from yt_dlp.utils import ExtractorError from yt_dlp.utils import ExtractorError
import vxlogging as log import vxlogging as log
from utils import getTweetIdFromUrl, pathregex
from vxApi import getApiResponse from vxApi import getApiResponse
from urllib.parse import urlparse from urllib.parse import urlparse
app = Flask(__name__) app = Flask(__name__)
CORS(app) CORS(app)
user_agent="" user_agent=""
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})")
generate_embed_user_agents = [ generate_embed_user_agents = [
"facebookexternalhit/1.1", "facebookexternalhit/1.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.57 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.57 Safari/537.36",
@ -43,15 +43,17 @@ def isValidUserAgent(user_agent):
return True return True
return False return False
def getTweetIdFromUrl(url): def message(text):
match = pathregex.search(url) return render_template(
if match is not None: 'default.html',
return match.group(2) message = text,
else: color = config['config']['color'],
return None appname = config['config']['appname'],
repo = config['config']['repo'],
url = config['config']['url'] )
def renderImageTweetEmbed(tweetData,image,appnameSuffix=""): def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
qrt = None qrt = tweetData['qrt']
pollData = None pollData = None
embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData)) embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData))
return render_template("image.html", return render_template("image.html",
@ -63,13 +65,30 @@ def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
appname=config['config']['appname']+appnameSuffix, appname=config['config']['appname']+appnameSuffix,
) )
def renderVideoTweetEmbed(tweetData,video,appnameSuffix=""): def renderVideoTweetEmbed(tweetData,mediaInfo,appnameSuffix=""):
# TODO: render video tweet embed template qrt = tweetData['qrt']
return "Video tweet embed" pollData = None
embedDesc = msgs.formatEmbedDesc("Video",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData))
return render_template("video.html",
tweet=tweetData,
media=mediaInfo,
host=config['config']['url'],
desc=embedDesc,
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
appname=config['config']['appname']+appnameSuffix,
)
def renderTextTweetEmbed(tweetData,appnameSuffix=""): def renderTextTweetEmbed(tweetData,appnameSuffix=""):
# TODO: render text tweet embed template qrt = tweetData['qrt']
return "Text tweet embed" pollData = None
embedDesc = msgs.formatEmbedDesc("Text",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData))
return render_template("text.html",
tweet=tweetData,
host=config['config']['url'],
desc=embedDesc,
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
appname=config['config']['appname']+appnameSuffix,
)
@app.route('/robots.txt') @app.route('/robots.txt')
def robots(): def robots():
@ -89,6 +108,10 @@ def oembedend():
return oEmbedGen(desc, user, link, ttype,providerName=provName) return oEmbedGen(desc, user, link, ttype,providerName=provName)
def getTweetData(twitter_url): def getTweetData(twitter_url):
cachedVNF = getVnfFromLinkCache(twitter_url)
if cachedVNF is not None:
return cachedVNF
try: try:
rawTweetData = twExtract.extractStatusV2Anon(twitter_url) rawTweetData = twExtract.extractStatusV2Anon(twitter_url)
except: except:
@ -101,6 +124,22 @@ def getTweetData(twitter_url):
if rawTweetData is None: if rawTweetData is None:
return None return None
tweetData = getApiResponse(rawTweetData) tweetData = getApiResponse(rawTweetData)
if tweetData is None:
return None
addVnfToLinkCache(twitter_url,tweetData)
return tweetData
def determineEmbedTweet(tweetData):
# Determine which tweet, i.e main or QRT, to embed the media from.
# if there is no QRT, return the main tweet => default behavior
# if both don't have media, return the main tweet => embedding qrt text will be handled in the embed description
# if both have media, return the main tweet => priority is given to the main tweet's media
# if only the QRT has media, return the QRT => show the QRT's media, not the main tweet's
# if only the main tweet has media, return the main tweet => show the main tweet's media, embedding QRT text will be handled in the embed description
if tweetData['qrt'] is None:
return tweetData
if tweetData['qrt']['hasMedia'] and not tweetData['hasMedia']:
return tweetData['qrt']
return tweetData return tweetData
@app.route('/<path:sub_path>') # Default endpoint used by everything @app.route('/<path:sub_path>') # Default endpoint used by everything
@ -112,11 +151,11 @@ def twitfix(sub_path):
tweetData = getTweetData(twitter_url) tweetData = getTweetData(twitter_url)
if tweetData is None: if tweetData is None:
abort(404) return message(msgs.failedToScan)
qrt = None qrt = None
if 'qrtURL' in tweetData: if 'qrtURL' in tweetData and tweetData['qrtURL'] is not None:
qrt = getTweetData(tweetData['qrtURL']) qrt = getTweetData(tweetData['qrtURL'])
tweetData['qrt'] = qrt
###return tweetData ###return tweetData
embedIndex = -1 embedIndex = -1
@ -127,6 +166,7 @@ def twitfix(sub_path):
if request.url.startswith("https://api.vx"): # Directly return the API response if the request is from the API if request.url.startswith("https://api.vx"): # Directly return the API response if the request is from the API
return tweetData return tweetData
elif request.url.startswith("https://d.vx"): # direct embed elif request.url.startswith("https://d.vx"): # direct embed
# direct embeds should always prioritize the main tweet, so don't check for qrt
# determine what type of media we're dealing with # determine what type of media we're dealing with
if not tweetData['hasMedia'] and qrt is None: if not tweetData['hasMedia'] and qrt is None:
return renderTextTweetEmbed(tweetData) return renderTextTweetEmbed(tweetData)
@ -142,19 +182,20 @@ def twitfix(sub_path):
elif media['type'] == "video" or media['type'] == "animated_gif": elif media['type'] == "video" or media['type'] == "animated_gif":
return redirect(media['url'], 302) # TODO: might not work return redirect(media['url'], 302) # TODO: might not work
else: # full embed else: # full embed
if not tweetData['hasMedia']: embedTweetData = determineEmbedTweet(tweetData)
if not embedTweetData['hasMedia']:
return renderTextTweetEmbed(tweetData) return renderTextTweetEmbed(tweetData)
elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None: elif embedTweetData['allSameType'] and embedTweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and embedTweetData['combinedMediaUrl'] != None:
return renderImageTweetEmbed(tweetData,tweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality") return renderImageTweetEmbed(tweetData,embedTweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality")
else: else:
# this means we have mixed media or video, and we're only going to embed one # this means we have mixed media or video, and we're only going to embed one
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
embedIndex = 0 embedIndex = 0
media = tweetData['media_extended'][embedIndex] media = embedTweetData['media_extended'][embedIndex]
if media['type'] == "image": if media['type'] == "image":
return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=f' - Media {embedIndex+1}/{len(tweetData["media_extended"])}') return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=f' - Media {embedIndex+1}/{len(embedTweetData["media_extended"])}')
elif media['type'] == "video" or media['type'] == "animated_gif": elif media['type'] == "video" or media['type'] == "animated_gif":
return renderVideoTweetEmbed(tweetData,media['url']) return renderVideoTweetEmbed(tweetData,media)
return tweetData return tweetData

11
utils.py Normal file
View File

@ -0,0 +1,11 @@
import re
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})")
def getTweetIdFromUrl(url):
match = pathregex.search(url)
if match is not None:
return match.group(2)
else:
return None