Merge pull request #203 from dylanpdx/cleanup

Major Refactor
This commit is contained in:
Dylan 2024-05-02 20:00:47 +01:00 committed by GitHub
commit 55c26d0ca4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 772 additions and 1120 deletions

View File

@ -5,6 +5,7 @@ import json
import os
import boto3
import vxlogging as log
from utils import getTweetIdFromUrl
link_cache_system = config['config']['link_cache']
link_cache = {}
@ -44,8 +45,7 @@ def serializeUnknown(obj):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
def addVnfToLinkCache(video_link, vnf):
video_link = video_link.lower()
def addVnfToTweetIdCache(tweet_id, vnf):
global link_cache
try:
if link_cache_system == "db":
@ -53,20 +53,20 @@ def addVnfToLinkCache(video_link, vnf):
log.debug("Link added to DB cache ")
return True
elif link_cache_system == "json":
link_cache[video_link] = vnf
link_cache[tweet_id] = vnf
with open("links.json", "w") as outfile:
json.dump(link_cache, outfile, indent=4, sort_keys=True, default=serializeUnknown)
log.debug("Link added to JSON cache ")
return True
elif link_cache_system == "ram": # FOR TESTS ONLY
link_cache[video_link] = vnf
link_cache[tweet_id] = vnf
log.debug("Link added to RAM cache ")
elif link_cache_system == "dynamodb": # pragma: no cover
vnf["ttl"] = int(vnf["ttl"].strftime('%s'))
table = client.Table(DYNAMO_CACHE_TBL)
table.put_item(
Item={
'tweet': video_link,
'tweet': tweet_id,
'vnf': vnf,
'ttl':vnf["ttl"]
}
@ -74,19 +74,21 @@ def addVnfToLinkCache(video_link, vnf):
log.debug("Link added to dynamodb cache ")
return True
except Exception as e:
log.error("Failed to add link to DB cache: "+str(e)+" "+video_link)
log.error("Failed to add link to DB cache: "+str(e)+" "+tweet_id)
return False
def getVnfFromLinkCache(video_link):
video_link = video_link.lower()
def addVnfToLinkCache(twitter_url, vnf):
return addVnfToTweetIdCache(getTweetIdFromUrl(twitter_url), vnf)
def getVnfFromTweetIdCache(tweet_id):
global link_cache
if link_cache_system == "db":
collection = db.linkCache
vnf = collection.find_one({'tweet': video_link})
vnf = collection.find_one({'tweet': tweet_id})
if vnf != None:
hits = ( vnf['hits'] + 1 )
log.debug("Link located in DB cache.")
query = { 'tweet': video_link }
query = { 'tweet': tweet_id }
change = { "$set" : { "hits" : hits } }
out = db.linkCache.update_one(query, change)
return vnf
@ -94,9 +96,9 @@ def getVnfFromLinkCache(video_link):
log.debug("Link not in DB cache")
return None
elif link_cache_system == "json":
if video_link in link_cache:
if tweet_id in link_cache:
log.debug("Link located in json cache")
vnf = link_cache[video_link]
vnf = link_cache[tweet_id]
return vnf
else:
log.debug("Link not in json cache")
@ -105,7 +107,7 @@ def getVnfFromLinkCache(video_link):
table = client.Table(DYNAMO_CACHE_TBL)
response = table.get_item(
Key={
'tweet': video_link
'tweet': tweet_id
}
)
if 'Item' in response:
@ -116,9 +118,9 @@ def getVnfFromLinkCache(video_link):
log.debug("Link not in dynamodb cache")
return None
elif link_cache_system == "ram": # FOR TESTS ONLY
if video_link in link_cache:
if tweet_id in link_cache:
log.debug("Link located in json cache")
vnf = link_cache[video_link]
vnf = link_cache[tweet_id]
return vnf
else:
log.debug("Link not in cache")
@ -126,6 +128,9 @@ def getVnfFromLinkCache(video_link):
elif link_cache_system == "none":
return None
def getVnfFromLinkCache(twitter_url):
return getVnfFromTweetIdCache(getTweetIdFromUrl(twitter_url))
def clearCache():
global link_cache
# only intended for use in tests

24
msgs.py
View File

@ -10,30 +10,36 @@ videoDescLimit=220
tweetDescLimit=340
def genLikesDisplay(vnf):
if vnf['rts'] > 0:
return ("\n\n💖 " + numerize.numerize(vnf['likes']) + " 🔁 " + numerize.numerize(vnf['rts']))
if vnf['retweets'] > 0:
return ("\n\n💖 " + numerize.numerize(vnf['likes']) + " 🔁 " + numerize.numerize(vnf['retweets']))
else:
return ("\n\n💖 " + numerize.numerize(vnf['likes']))
def genQrtDisplay(qrt):
verifiedCheck = "☑️" if ('verified' in qrt and qrt['verified']) else ""
return ("\n\n【QRT of " + qrt['uploader'] + " (@" + qrt['screen_name'] + ")"+ verifiedCheck+":】\n'" + qrt['description'] + "'")
return ("\n\n【QRT of " + qrt['user_name'] + " (@" + qrt['user_screen_name'] + ")"+ verifiedCheck+":】\n'" + qrt['text'] + "'")
def genPollDisplay(poll):
pctSplit=10
output="\n\n"
for choice in poll["choices"]:
output+=choice["text"]+"\n"+(""*int(choice["percent"]/pctSplit)) +" "+str(choice["percent"])+"%\n"
for choice in poll["options"]:
output+=choice["name"]+"\n"+(""*int(choice["percent"]/pctSplit)) +" "+str(choice["percent"])+"%\n"
return output
def formatEmbedDesc(type,body,qrt,pollDisplay,likesDisplay):
def formatEmbedDesc(type,body,qrt,pollData,likesDisplay):
# Trim the embed description to 248 characters, prioritizing poll and likes
limit = videoDescLimit if type=="" or type=="Video" or (qrt!=None and (qrt["type"]=="" or qrt["type"]=="Video")) else tweetDescLimit
qrtType=None
if qrt!=None:
qrtType="Text"
limit = videoDescLimit if type=="Text" or type=="Video" or (qrt!=None and (qrtType=="Text" or qrtType=="Video")) else tweetDescLimit
output = ""
if pollDisplay==None:
if pollData==None:
pollDisplay=""
else:
pollDisplay=genPollDisplay(pollData)
if qrt!=None:
@ -55,6 +61,6 @@ def formatEmbedDesc(type,body,qrt,pollDisplay,likesDisplay):
diff = len(output)-limit
# remove the characters from body, add ellipsis
body = body[:-(diff+1)]+""
return formatEmbedDesc(type,body,qrt,pollDisplay,likesDisplay)
return formatEmbedDesc(type,body,qrt,pollData,likesDisplay)
else:
return output

View File

@ -1,25 +1,10 @@
{% extends 'base.html' %}
<!--
{{ color }} - Custom Color Code set in the TwitFix Config
{{ appname }} - Custom Appname set in the TwitFix Config
{{ user }} - Username of the Source Tweet
{{ pic }} - Thumbnail Image for a given video
{{ vidlink }} - Direct link back to the source tweet
{{ vidurl }} - Direct MP4 link to the video contained in the tweet
{{ desc }} - Tweet Text
{{ url }} - Base URL of the site hosting TwitFix
{{ urlDesc }} - HTTP Encoded Tweet Text
{{ urlUser }} - HTTP Encoded Username
{{ urlLink }} - HTTP Encoded source link
{{ pfp }} - URL of the posters PFP
-->
{% block head %}
<meta content="text/html; charset=UTF-8" http-equiv="Content-Type" />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}" />
<meta name="twitter:card" content="summary_large_image" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" />
{% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ pic[0] }}" />
{% if pic[1] %}
@ -34,9 +19,9 @@
<meta name="twitter:image" content="{{ pic[3] }}" />
{% endif %}
<meta name="twitter:creator" content="@{{ user }}" />
<meta name="twitter:creator" content="@{{ tweet['user_name'] }}" />
<meta property="og:description" content="{{ desc }}" />
<link rel="alternate" href="{{ url }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=photo&provider={{ appname }}" type="application/json+oembed" title="{{ user }}">
<link rel="alternate" href="{{ host }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=photo&provider={{ appname }}" type="application/json+oembed" title="{{ tweet['user_name'] }}">
<meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %}

View File

@ -1,16 +1,16 @@
{% extends 'base.html' %} {% block head %}
<meta content='text/html; charset=UTF-8' http-equiv='Content-Type' />
<meta name="twitter:player:stream" content="{{ vidurl }}" />
<meta name="twitter:player:stream" content="{{ media['url'] }}" />
<meta name="twitter:player:stream:content_type" content="video/mp4" />
<meta name="twitter:player:width" content="{{ videoSize['width'] }}" />
<meta name="twitter:player:height" content="{{ videoSize['height'] }}" />
<meta property="og:url" content="{{ vidurl }}" />
<meta property="og:video" content="{{ vidurl }}" />
<meta property="og:video:secure_url" content="{{ vidurl }}" />
<meta name="twitter:player:width" content="{{ media['size']['width'] }}" />
<meta name="twitter:player:height" content="{{ media['size']['height'] }}" />
<meta property="og:url" content="{{ media['url'] }}" />
<meta property="og:video" content="{{ media['url'] }}" />
<meta property="og:video:secure_url" content="{{ media['url'] }}" />
<meta property="og:video:type" content="video/mp4" />
<meta property="og:video:width" content="{{ videoSize['width'] }}" />
<meta property="og:video:height" content="{{ videoSize['height'] }}" />
<meta property="og:video:width" content="{{ media['size']['width'] }}" />
<meta property="og:video:height" content="{{ media['size']['height'] }}" />
<meta name="twitter:card" content="player" />
<meta http-equiv="refresh" content="0; url = {{ vidurl }}" /> {% endblock %} {% block body %} Redirecting you to the video in a moment. <a href="{{ vidurl }}">Or click here.</a> {% endblock %}
<meta http-equiv="refresh" content="0; url = {{ media['url'] }}" /> {% endblock %} {% block body %} Redirecting you to the video in a moment. <a href="{{ media['url'] }}">Or click here.</a> {% endblock %}

View File

@ -1,28 +0,0 @@
<script>
function androidOrIOS() {
const userAgent = navigator.userAgent;
if(/android/i.test(userAgent)){
return 'android';
}
if(/iPad|iPhone|iPod/i.test(userAgent)){
return 'ios';
}
return 'unknown';
}
function redirect() {
const os = androidOrIOS();
if(os === 'android'){
window.location.href = 'twitter://status?status_id=1674915987789950982';
window.location.href = 'https://twitter.com/pdxdylan/status/1674915987789950982'
}
if(os === 'ios'){
window.location.href = 'twitter://status?id=1674915987789950982';
setTimeout(() => {
window.location.href = 'https://twitter.com/pdxdylan/status/1674915987789950982'
}, 100);
}
}
redirect();
</script>

View File

@ -1,34 +1,13 @@
{% extends 'base.html' %}
<!--
{{ color }} - Custom Color Code set in the TwitFix Config
{{ appname }} - Custom Appname set in the TwitFix Config
{{ user }} - Username of the Source Tweet
{{ pic }} - Thumbnail Image for a given video
{{ vidlink }} - Direct link back to the source tweet
{{ vidurl }} - Direct MP4 link to the video contained in the tweet
{{ desc }} - Tweet Text
{{ url }} - Base URL of the site hosting TwitFix
{{ urlDesc }} - HTTP Encoded Tweet Text
{{ urlUser }} - HTTP Encoded Username
{{ urlLink }} - HTTP Encoded source link
{{ pfp }} - URL of the posters PFP
{{ screenName }} - Users base username
{{ rts }} - Retweet Count
{{ likes }} - Like Count
{{ time }} - Time Created
-->
{% block head %}
<meta content="text/html; charset=UTF-8" http-equiv="Content-Type" />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}" />
<meta property="og:image" content="{{ pfp }}" />
<meta name="twitter:card" content="tweet" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" />
<meta name="twitter:image" content="{{ pic[0] }}" />
{% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ tweet['user_profile_image_url'] }}" />
<meta name="twitter:creator" content="@{{ user }}" />
<meta property="og:description" content="{{ desc }}" />
<link rel="alternate" href="{{ url }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=link&provider={{ appname }}" type="application/json+oembed" title="{{ user }}">
<link rel="alternate" href="{{ host }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=link&provider={{ appname }}" type="application/json+oembed" title="{{ tweet['user_name'] }}">
<meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %}

View File

@ -0,0 +1,5 @@
<meta content='text/html; charset=UTF-8' http-equiv='Content-Type' />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}">
<meta name="twitter:title" content="{{ tweet['user_name'] }} (@{{ tweet['user_screen_name'] }})" />

View File

@ -1,25 +1,22 @@
{% extends 'base.html' %} {% block head %}
<meta content='text/html; charset=UTF-8' http-equiv='Content-Type' />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}">
<meta name="twitter:card" content="player" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" />
<meta name="twitter:image" content="{{ pic }}" />
<meta name="twitter:player:width" content="{{ videoSize['width'] }}" />
<meta name="twitter:player:height" content="{{ videoSize['height'] }}" />
<meta name="twitter:player:stream" content="{{ vidurl }}" />
{% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ media['thumbnail_url'] }}" />
<meta name="twitter:player:width" content="{{ media['size']['width'] }}" />
<meta name="twitter:player:height" content="{{ media['size']['height'] }}" />
<meta name="twitter:player:stream" content="{{ media['url'] }}" />
<meta name="twitter:player:stream:content_type" content="video/mp4" />
<meta property="og:url" content="{{ vidlink }}" />
<meta property="og:video" content="{{ vidurl }}" />
<meta property="og:video:secure_url" content="{{ vidurl }}" />
<meta property="og:video" content="{{ media['url'] }}" />
<meta property="og:video:secure_url" content="{{ media['url'] }}" />
<meta property="og:video:type" content="video/mp4" />
<meta property="og:video:width" content="{{ videoSize['width'] }}" />
<meta property="og:video:height" content="{{ videoSize['height'] }}" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" />
<meta property="og:image" content="{{ pic[0] }}" />
<meta property="og:video:width" content="{{ media['size']['width'] }}" />
<meta property="og:video:height" content="{{ media['size']['height'] }}" />
<meta property="og:image" content="{{ media['thumbnail_url'] }}" />
<meta property="og:description" content="{{ desc }}" />
<link rel="alternate" href="{{ url }}/oembed.json?desc={{ urlUser }}&user={{ urlDesc }}&link={{ tweetLink }}&ttype=video&provider={{ appname }}" type="application/json+oembed" title="{{ user }}">
<link rel="alternate" href="{{ host }}/oembed.json?desc={{ urlUser }}&user={{ urlEncodedDesc }}&link={{ tweetLink }}&ttype=video&provider={{ appname }}" type="application/json+oembed" title="{{ tweet['user_name'] }}">
<meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %}

View File

@ -1,280 +0,0 @@
import os
import twitfix,twExtract
import cache
import msgs
from flask.testing import FlaskClient
client = FlaskClient(twitfix.app)
# autogenerated from testgen.py
testTextTweet="https://twitter.com/jack/status/20"
testVideoTweet="https://twitter.com/pdxdylan/status/1540398733669666818"
testMediaTweet="https://twitter.com/pdxdylan/status/1534672932106035200"
testMultiMediaTweet="https://twitter.com/pdxdylan/status/1532006436703715331"
testQRTTweet="https://twitter.com/pdxdylan/status/1611477137319514129"
testQrtCeptionTweet="https://twitter.com/CatherineShu/status/585253766271672320"
testQrtVideoTweet="https://twitter.com/pdxdylan/status/1674561759422578690"
testNSFWTweet="https://twitter.com/kuyacoy/status/1581185279376838657"
testTextTweet_compare={'tweet': 'https://twitter.com/jack/status/20', 'url': '', 'description': 'just setting up my twttr', 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2006-03-21T20:50:14.000Z', 'qrtURL': None, 'nsfw': False, 'size': {}, 'isGif': False}
testVideoTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1540398733669666818', 'url': 'https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4?tag=12', 'description': 'TikTok embeds on Discord/Telegram bait you with a fake play button, but to see the actual video you have to go to their website.\nAs a request from a friend, I made it so that if you add "vx" before "tiktok" on any link, it fixes that. https://t.co/QYpiVXUIrW', 'thumbnail': 'https://pbs.twimg.com/ext_tw_video_thumb/1540396699037929472/pu/img/l187Z6B9AHHxUKPV.jpg', 'type': 'Video', 'images': ['', '', '', '', ''], 'time': '2022-06-24T18:17:31.000Z', 'qrtURL': None, 'nsfw': False, 'size': {'height': 528, 'width': 762, 'focus_rects': []}, 'isGif': False}
testMediaTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1534672932106035200', 'url': '', 'description': 'oh. https://t.co/HgLAbiXw2E', 'thumbnail': 'https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png', 'type': 'Image', 'images': ['https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png', '', '', '', '1'], 'time': '2022-06-08T23:05:14.000Z', 'qrtURL': None, 'nsfw': False, 'size': {}, 'isGif': False}
testMultiMediaTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1532006436703715331', 'url': '', 'description': 'Released #Retro64 1.0.9. Besides a lot of internal bug-fixes, this adds quicksand blocks, fixes the rendering for the castle stairs block, and adds a new model, Sonic! \nhttps://github.com/Retro64Mod/Retro64Mod/releases/tag/1.18.2-1.0.9 https://t.co/CWZaw4hzyg', 'thumbnail': 'https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'type': 'Image', 'images': ['https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png', 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png', '', '3'], 'time': '2022-06-01T14:29:32.000Z', 'qrtURL': None, 'nsfw': False, 'size': {}, 'isGif': False}
testQRTTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1611477137319514129', 'url': '', 'description': "vxTwitter has gotten a *ton* of usage recently, so I'd appreciate a donation to keep things running!\n", 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2023-01-06T21:37:43.000Z', 'qrtURL': 'https://twitter.com/pdxdylan/status/1518309187515781125', 'nsfw': False, 'size': {}, 'isGif': False}
testQrtCeptionTweet_compare={'tweet': 'https://twitter.com/CatherineShu/status/585253766271672320', 'url': '', 'description': 'Testing retweetception ', 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2015-04-07T01:32:26.000Z', 'qrtURL': 'https://twitter.com/EliLanger/status/585253161260216320', 'nsfw': False, 'size': {}, 'isGif': False}
testQrtVideoTweet_compare={'tweet': 'https://twitter.com/pdxdylan/status/1674561759422578690', 'url': '', 'description': 'good', 'thumbnail': '', 'type': 'Text', 'images': ['', '', '', '', ''], 'time': '2023-06-29T23:33:29.000Z', 'qrtURL': 'https://twitter.com/TeaboyAllStars/status/1674197531301904388', 'nsfw': False, 'size': {}, 'isGif': False}
testNSFWTweet_compare={'tweet': 'https://twitter.com/kuyacoy/status/1581185279376838657', 'url': '', 'description': "ngl, I'm scared on finding out the cute Sprigatito's final evolution..\n\nso i had a bot generate it for me.... and I'm forever scarred https://t.co/itMay87vcS", 'thumbnail': 'https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg', 'type': 'Image', 'images': ['https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg', '', '', '', '1'], 'time': 'Sat Oct 15 07:28:42 +0000 2022', 'qrtURL': None, 'nsfw': True, 'size': {}, 'isGif': False}
testUser="https://twitter.com/jack"
testUserID = "https://twitter.com/i/user/12"
testUserWeirdURLs=["https://twitter.com/jack?lang=en","https://twitter.com/jack/with_replies","https://twitter.com/jack/media","https://twitter.com/jack/likes","https://twitter.com/jack/with_replies?lang=en","https://twitter.com/jack/media?lang=en","https://twitter.com/jack/likes?lang=en","https://twitter.com/jack/"]
testTextTweet="https://twitter.com/jack/status/20"
testPollTweet="https://twitter.com/norm/status/651169346518056960"
testPoll_comparePoll={"name":"poll2choice_text_only","binding_values":{"choice1_label":{"type":"STRING","string_value":"Mean one thing"},"choice2_label":{"type":"STRING","string_value":"Mean multiple things"},"end_datetime_utc":{"type":"STRING","string_value":"2015-10-06T22:57:24Z"},"counts_are_final":{"type":"BOOLEAN","boolean_value":True},"choice2_count":{"type":"STRING","string_value":"33554"},"choice1_count":{"type":"STRING","string_value":"124875"},"last_updated_datetime_utc":{"type":"STRING","string_value":"2015-10-06T22:57:31Z"},"duration_minutes":{"type":"STRING","string_value":"1440"}}}
testPoll_comparePollVNF={'total_votes': 158429, 'choices': [{'text': 'Mean one thing', 'votes': 124875, 'percent': 78.8}, {'text': 'Mean multiple things', 'votes': 33554, 'percent': 21.2}]}
tokens=os.getenv("VXTWITTER_WORKAROUND_TOKENS",None).split(',')
def compareDict(original,compare):
for key in original:
assert key in compare
if type(compare[key]) is not dict:
if (key == 'verified' or key== 'time') and compare[key]!=original[key]:
continue # does not match as test data was from before verification changes
assert compare[key]==original[key]
else:
compareDict(original[key],compare[key])
## Specific API tests ##
def test_syndicationAPI():
tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description']
def test_extractStatusV2Anon():
tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None)
assert tweet["full_text"]==testTextTweet_compare['description']
tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None)
assert tweet["full_text"]==testVideoTweet_compare['description']
tweet = twExtract.extractStatusV2AnonLegacy(testMediaTweet,None)
assert tweet["full_text"]==testMediaTweet_compare['description']
tweet = twExtract.extractStatusV2AnonLegacy(testMultiMediaTweet,None)
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94]
def test_v2API():
tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description']
## Tweet retrieve tests ##
def test_textTweetExtract():
tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testTextTweet_compare['description']
assert tweet["user"]["screen_name"]=="jack"
assert 'extended_entities' not in tweet
def test_extractV2(): # remove this when v2 is default
tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens)
def test_UserExtract():
user = twExtract.extractUser(testUser,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_UserExtractID():
user = twExtract.extractUser(testUserID,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_UserExtractWeirdURLs():
for url in testUserWeirdURLs:
user = twExtract.extractUser(url,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_videoTweetExtract():
tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testVideoTweet_compare['description']
assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==1
video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testVideoTweet_compare['thumbnail']
assert video["type"]=="video"
def test_mediaTweetExtract():
tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description']
assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==1
video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testMediaTweet_compare['thumbnail']
assert video["type"]=="photo"
def test_multimediaTweetExtract():
tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94]
assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==3
video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testMultiMediaTweet_compare["images"][0]
assert video["type"]=="photo"
video = tweet['extended_entities']["media"][1]
assert video["media_url_https"]==testMultiMediaTweet_compare["images"][1]
assert video["type"]=="photo"
def test_pollTweetExtract():
tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens)
assert 'card' in tweet
compareDict(testPoll_comparePoll,tweet['card'])
def test_NSFW_TweetExtract():
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error
## VNF conversion test ##
def test_textTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testTextTweet)
compareDict(testTextTweet_compare,vnf)
def test_videoTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testVideoTweet)
compareDict(testVideoTweet_compare,vnf)
def test_mediaTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testMediaTweet)
compareDict(testMediaTweet_compare,vnf)
def test_multimediaTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testMultiMediaTweet)
compareDict(testMultiMediaTweet_compare,vnf)
def test_pollTweetVNF():
vnf = twitfix.link_to_vnf_from_unofficial_api(testPollTweet)
compareDict(testPoll_comparePollVNF,vnf['poll'])
def test_qrtTweet():
cache.clearCache()
# this is an incredibly lazy test, todo: improve it in the future
resp = client.get(testQRTTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert testQRTTweet_compare['description'][:10] in str(resp.data)
# test qrt-ception
resp = client.get(testQrtCeptionTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) # get top level tweet
assert resp.status_code==200
assert "Please retweet this to spread awareness for retweets" in str(resp.data)
qtd_tweet=cache.getVnfFromLinkCache("https://twitter.com/EliLanger/status/585253161260216320") # check that the quoted tweet for the top level tweet is cached
assert qtd_tweet is not None
assert qtd_tweet["qrtURL"] is not None # check that the quoted tweet for the top level tweet has a qrtURL
assert cache.getVnfFromLinkCache("https://twitter.com/EliLanger/status/313143446842007553") is None # check that the bottom level tweet has NOT been cached
resp = client.get("/EliLanger/status/585253161260216320",headers={"User-Agent":"test"}) # get mid level tweet
assert resp.status_code==200
assert cache.getVnfFromLinkCache("https://twitter.com/EliLanger/status/313143446842007553") is not None # check that the bottom level tweet has been cached now
def test_qrtVideoTweet():
cache.clearCache()
# this is an incredibly lazy test, todo: improve it in the future
resp = client.get(testQrtVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
vurl = testQrtVideoTweet_compare["url"]
assert f"twitter:player:stream\" content=\"{vurl}" in str(resp.data)
## Test adding to cache ; cache should be empty ##
def test_addToCache():
cache.clearCache()
twitfix.vnfFromCacheOrDL(testTextTweet)
twitfix.vnfFromCacheOrDL(testVideoTweet)
twitfix.vnfFromCacheOrDL(testMediaTweet)
twitfix.vnfFromCacheOrDL(testMultiMediaTweet)
#retrieve
compareDict(testTextTweet_compare,cache.getVnfFromLinkCache(testTextTweet))
compareDict(testVideoTweet_compare,cache.getVnfFromLinkCache(testVideoTweet))
compareDict(testMediaTweet_compare,cache.getVnfFromLinkCache(testMediaTweet))
compareDict(testMultiMediaTweet_compare,cache.getVnfFromLinkCache(testMultiMediaTweet))
cache.clearCache()
def test_embedFromScratch():
cache.clearCache()
client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
client.get(testVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
def test_embedFromCache():
cache.clearCache()
twitfix.vnfFromCacheOrDL(testTextTweet)
twitfix.vnfFromCacheOrDL(testVideoTweet)
twitfix.vnfFromCacheOrDL(testMediaTweet)
twitfix.vnfFromCacheOrDL(testMultiMediaTweet)
#embed time
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
def test_embedSuggestive():
resp = client.get(testNSFWTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert "so i had a bot generate it for me" in str(resp.data)
assert "FfF_gKwXgAIpnpD" in str(resp.data)
def test_veryLongEmbed():
cache.clearCache()
cache.setCache({'https://twitter.com/TEST/status/1234':
{"description":"A"*1024,"hits":0,"images":["","","","",""],"likes":1234,"nsfw":False,"pfp":"","qrt":{},"rts":1234,"screen_name":"TEST","thumbnail":"","time":"","tweet":"https://twitter.com/TEST/status/1234","type":"Text","uploader":"Test","url":""}
})
resp = client.get('https://twitter.com/TEST/status/1234'.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
def test_embedFromOutdatedCache(): # presets a cache that has VNF's with missing fields; there's probably a better way to do this
cache.setCache({"https://twitter.com/Twitter/status/1118295916874739714":{"description":"On profile pages, we used to only show someones replies, not the original Tweet 🙄 Now were showing both so you can follow the conversation more easily! https://t.co/LSBEZYFqmY","hits":0,"images":["https://pbs.twimg.com/media/D4TS4xeX4AA02DI.jpg","","","","1"],"likes":5033,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg","qrt":{},"rts":754,"screen_name":'Twitter',"thumbnail":"https://pbs.twimg.com/media/D4TS4xeX4AA02DI.jpg","time":"Tue Apr 16 23:31:38 +0000 2019","tweet":"https://twitter.com/Twitter/status/1118295916874739714","type":"Image","uploader":'Twitter',"url":""},
"https://twitter.com/Twitter/status/1263145271946551300":{"description":"Testing, testing...\n\nA new way to have a convo with exactly who you want. Were starting with a small % globally, so keep your 👀 out to see it in action. https://t.co/pV53mvjAVT","hits":0,"images":["","","","",""],"likes":61584,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg","qrt":{},"rts":17138,"screen_name":'Twitter',"thumbnail":"https://pbs.twimg.com/media/EYeX7akWsAIP1_1.jpg","time":"Wed May 20 16:31:15 +0000 2020","tweet":"https://twitter.com/Twitter/status/1263145271946551300","type":"Video","uploader":'Twitter',"url":"https://video.twimg.com/amplify_video/1263145212760805376/vid/1280x720/9jous8HM0_duxL0w.mp4?tag=13"},
#"https://twitter.com/Twitter/status/1293239745695211520":{"description":"We tested, you Tweeted, and now were rolling it out to everyone! https://t.co/w6Q3Q6DiKz","hits":0,"images":["https://pbs.twimg.com/media/EfJ-C-JU0AAQL_C.jpg","https://pbs.twimg.com/media/EfJ-aHlU0AAU1kq.jpg","","","2"],"likes":5707,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg","qrt":{},"rts":1416,"screen_name":"Twitter","thumbnail":"https://pbs.twimg.com/media/EfJ-C-JU0AAQL_C.jpg","time":"Tue Aug 11 17:35:57 +0000 2020","tweet":"https://twitter.com/Twitter/status/1293239745695211520","type":"Image","uploader":"Twitter","url":""},
"https://twitter.com/jack/status/20":{"description":"just setting up my twttr","hits":0,"images":["","","","",""],"likes":179863,"nsfw":False,"pfp":"https://pbs.twimg.com/profile_images/1115644092329758721/AFjOr-K8_normal.jpg","qrt":{},"rts":122021,"screen_name":"jack","thumbnail":"","time":"Tue Mar 21 20:50:14 +0000 2006","tweet":"https://twitter.com/jack/status/20","type":"Text","uploader":"jack","url":""},
testQrtVideoTweet:{'tweet': 'https://twitter.com/Twitter/status/1494436688554344449', 'url': '', 'description': 'https://twitter.com/TwitterSupport/status/1494386367467593737', 'thumbnail': '', 'uploader': 'Twitter', 'screen_name': 'Twitter', 'pfp': 'https://pbs.twimg.com/profile_images/1488548719062654976/u6qfBBkF_normal.jpg', 'type': 'Text', 'images': ['', '', '', '', ''], 'likes': 5186, 'rts': 703, 'time': 'Thu Feb 17 22:20:46 +0000 2022', 'qrt': {'desc': 'Keep your fave DM convos easily accessible by pinning them! You can now pin up to six conversations that will stay at the top of your DM inbox.\n\nAvailable on Android, iOS, and web. https://t.co/kIjlzf9XLJ', 'handle': 'Twitter Support', 'screen_name': 'TwitterSupport', 'verified': True, 'id': '1494386367467593737'}, 'nsfw': False, 'verified': True, 'size': {}}
})
#embed time
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
# qrt
resp = client.get(testQrtVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert "twitter:player:stream\" content=\"https://video.twimg.com/tweet_video/FL0gdK8WUAIHHKa.mp4" in str(resp.data)
def test_directEmbed():
resp = client.get(testVideoTweet.replace("https://twitter.com","")+".mp4",headers={"User-Agent":"test"})
assert resp.status_code==200
assert testVideoTweet_compare["url"] in str(resp.data)
def test_message404():
resp = client.get("https://twitter.com/jack/status/12345",headers={"User-Agent":"test"})
assert resp.status_code==200
assert "Failed to scan your link!" in str(resp.data)
def test_combine():
twt,e = twitfix.vnfFromCacheOrDL(testMultiMediaTweet)
img1 = twt["images"][0]
img2 = twt["images"][1]
resp = client.get(f"/rendercombined.jpg?imgs={img1},{img2}",headers={"User-Agent":"test"})
assert resp.status_code==200
assert resp.headers["Content-Type"]=="image/jpeg"
assert len(resp.data)>1000
def test_calcSyndicationToken():
assert twExtract.calcSyndicationToken("1691389765483200513") == "43lnobuxzql"

27
test_vx_VNF.py Normal file
View File

@ -0,0 +1,27 @@
import twitfix
from vx_testdata import *
def getVNFFromLink(link):
return twitfix.getTweetData(link)
## VNF conversion test ##
def test_textTweetVNF():
vnf = getVNFFromLink(testTextTweet)
compareDict(testTextTweet_compare,vnf)
def test_videoTweetVNF():
vnf = getVNFFromLink(testVideoTweet)
compareDict(testVideoTweet_compare,vnf)
def test_mediaTweetVNF():
vnf = getVNFFromLink(testMediaTweet)
compareDict(testMediaTweet_compare,vnf)
def test_multimediaTweetVNF():
vnf = getVNFFromLink(testMultiMediaTweet)
compareDict(testMultiMediaTweet_compare,vnf)
def test_pollTweetVNF():
vnf = getVNFFromLink(testPollTweet)
compareDict(testPollTweet_compare,vnf)

172
test_vx_embeds.py Normal file
View File

@ -0,0 +1,172 @@
import os
import twitfix,twExtract
import cache
from flask.testing import FlaskClient
from vx_testdata import *
client = FlaskClient(twitfix.app)
def test_embed_qrtTweet():
cache.clearCache()
# this is an incredibly lazy test, todo: improve it in the future
resp = client.get(testQRTTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert testQRTTweet_compare['text'][:10] in str(resp.data)
# test qrt-ception
resp = client.get(testQrtCeptionTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"}) # get top level tweet
assert resp.status_code==200
assert "Please retweet this to spread awareness for retweets" in str(resp.data)
qtd_tweet=cache.getVnfFromLinkCache("https://twitter.com/EliLanger/status/585253161260216320") # check that the quoted tweet for the top level tweet is cached
assert qtd_tweet is not None
assert qtd_tweet["qrtURL"] is not None # check that the quoted tweet for the top level tweet has a qrtURL
assert cache.getVnfFromLinkCache("https://twitter.com/EliLanger/status/313143446842007553") is None # check that the bottom level tweet has NOT been cached
resp = client.get("/EliLanger/status/585253161260216320",headers={"User-Agent":"test"}) # get mid level tweet
assert resp.status_code==200
assert cache.getVnfFromLinkCache("https://twitter.com/EliLanger/status/313143446842007553") is not None # check that the bottom level tweet has been cached now
def test_embed_qrtVideoTweet():
cache.clearCache()
# this is an incredibly lazy test, todo: improve it in the future
resp = client.get(testQrtVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
qtd_tweet=cache.getVnfFromLinkCache("https://twitter.com/i/status/1674197531301904388")
vurl = qtd_tweet["mediaURLs"][0]
assert f"twitter:player:stream\" content=\"{vurl}" in str(resp.data)
def test_embed_FromScratch():
cache.clearCache()
client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
client.get(testVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
def test_embed_FromCache():
cache.clearCache()
twitfix.getTweetData(testTextTweet)
twitfix.getTweetData(testVideoTweet)
twitfix.getTweetData(testMediaTweet)
twitfix.getTweetData(testMultiMediaTweet)
#embed time
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testVideoTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
resp = client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
def test_embed_Suggestive():
resp = client.get(testNSFWTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert "so i had a bot generate it for me" in str(resp.data)
assert "FfF_gKwXgAIpnpD" in str(resp.data)
def test_embed_video_direct():
resp = client.get(testVideoTweet.replace("https://twitter.com","")+".mp4",headers={"User-Agent":"test"})
assert resp.status_code==200
assert testVideoTweet_compare["mediaURLs"][0] in str(resp.data)
def test_embed_video_direct_subdomain():
resp = client.get(testVideoTweet.replace("https://twitter.com","https://d.vxtwitter.com"),headers={"User-Agent":"test"})
assert resp.status_code==200
assert testVideoTweet_compare["mediaURLs"][0] in str(resp.data)
def test_embed_img_direct():
resp = client.get(testMediaTweet.replace("https://twitter.com","")+".png",headers={"User-Agent":"test"})
assert resp.status_code==302
assert testMediaTweet_compare["mediaURLs"][0] in str(resp.data)
def test_embed_img_direct_subdomain():
resp = client.get(testMediaTweet.replace("https://twitter.com","https://d.vxtwitter.com"),headers={"User-Agent":"test"})
assert resp.status_code==302
assert testMediaTweet_compare["mediaURLs"][0] in str(resp.data)
def test_embed_multi_direct():
# embed first item
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","")+"/1.png",headers={"User-Agent":"test"})
assert resp.status_code==302 # images should redirect
assert testMultiMediaTweet_compare["mediaURLs"][0] in str(resp.data)
# embed second item
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","")+"/2.mp4",headers={"User-Agent":"test"})
assert resp.status_code==302 # images should redirect
assert testMultiMediaTweet_compare["mediaURLs"][1] in str(resp.data)
def test_embed_multi_direct_subdomain():
# generic embed
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","https://d.vxtwitter.com"),headers={"User-Agent":"test"})
assert resp.status_code==302 # images should redirect
assert testMultiMediaTweet_compare["mediaURLs"][0] in str(resp.data)
# embed first item
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","https://d.vxtwitter.com")+"/1",headers={"User-Agent":"test"})
assert resp.status_code==302 # images should redirect
assert testMultiMediaTweet_compare["mediaURLs"][0] in str(resp.data)
# embed second item
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","https://d.vxtwitter.com")+"/2",headers={"User-Agent":"test"})
assert resp.status_code==302 # images should redirect
assert testMultiMediaTweet_compare["mediaURLs"][1] in str(resp.data)
def test_embed_message404():
resp = client.get("https://twitter.com/jack/status/12345",headers={"User-Agent":"test"})
assert resp.status_code==200
assert "Failed to scan your link!" in str(resp.data)
def test_combine():
twt = twitfix.getTweetData(testMultiMediaTweet)
img1 = twt["mediaURLs"][0]
img2 = twt["mediaURLs"][1]
resp = client.get(f"/rendercombined.jpg?imgs={img1},{img2}",headers={"User-Agent":"test"})
assert resp.status_code==200
assert resp.headers["Content-Type"]=="image/jpeg"
assert len(resp.data)>1000
def test_embed_combined():
twt = twitfix.getTweetData(testMultiMediaTweet)
img1 = twt["mediaURLs"][0]
img2 = twt["mediaURLs"][1]
resp = client.get(testMultiMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert f"/rendercombined.jpg?imgs={img1},{img2}" in str(resp.data)
def test_embed_multimedia_single():
twt = twitfix.getTweetData(testMultiMediaTweet)
img1 = twt["mediaURLs"][0]
img2 = twt["mediaURLs"][1]
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","")+"/1",headers={"User-Agent":"test"})
assert resp.status_code==200
assert img1 in str(resp.data) and img2 not in str(resp.data)
resp = client.get(testMultiMediaTweet.replace("https://twitter.com","")+"/2",headers={"User-Agent":"test"})
assert resp.status_code==200
assert img1 not in str(resp.data) and img2 in str(resp.data)
def test_embed_mixedMedia():
twt = twitfix.getTweetData(testMixedMediaTweet)
img1 = twt["mediaURLs"][0]
img2 = twt["mediaURLs"][1]
resp = client.get(testMixedMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
# Check for default behavior with no index
assert resp.status_code==200
assert img1 in str(resp.data) and img2 not in str(resp.data)
assert "Media 1/2" in str(resp.data) # make sure user knows there are multiple media
resp = client.get(testMixedMediaTweet.replace("https://twitter.com","")+"/1",headers={"User-Agent":"test"})
assert resp.status_code==200
assert img1 in str(resp.data) and img2 not in str(resp.data)
resp = client.get(testMixedMediaTweet.replace("https://twitter.com","")+"/2",headers={"User-Agent":"test"})
assert resp.status_code==200
assert img1 not in str(resp.data) and img2 in str(resp.data)
def test_embed_poll():
resp = client.get(testPollTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert "Mean one thing" in str(resp.data)
assert "78.82%" in str(resp.data)
def test_embed_stripLastUrl():
resp = client.get(testMediaTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
assert resp.status_code==200
assert "HgLAbiXw2E" not in str(resp.data)

94
test_vx_extract.py Normal file
View File

@ -0,0 +1,94 @@
import os
import twExtract
import utils
from vx_testdata import *
def test_twextract_syndicationAPI():
tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens)
assert utils.stripEndTCO(utils.stripEndTCO(tweet["full_text"]))==testMediaTweet_compare['text']
def test_twextract_extractStatusV2Anon():
tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None)
assert utils.stripEndTCO(tweet["full_text"])==testTextTweet_compare['text']
tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None)
assert utils.stripEndTCO(tweet["full_text"])==testVideoTweet_compare['text']
tweet = twExtract.extractStatusV2AnonLegacy(testMediaTweet,None)
assert utils.stripEndTCO(tweet["full_text"])==testMediaTweet_compare['text']
tweet = twExtract.extractStatusV2AnonLegacy(testMultiMediaTweet,None)
assert utils.stripEndTCO(tweet["full_text"])[:94]==testMultiMediaTweet_compare['text'][:94]
def test_twextract_v2API():
tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens)
assert utils.stripEndTCO(tweet["full_text"])==testMediaTweet_compare['text']
## Tweet retrieve tests ##
def test_twextract_textTweetExtract():
tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens)
assert utils.stripEndTCO(tweet["full_text"])==testTextTweet_compare['text']
assert tweet["user"]["screen_name"]=="jack"
assert 'extended_entities' not in tweet
def test_twextract_extractV2(): # remove this when v2 is default
tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens)
def test_twextract_UserExtract():
user = twExtract.extractUser(testUser,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_twextract_UserExtractID():
user = twExtract.extractUser(testUserID,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_twextract_UserExtractWeirdURLs():
for url in testUserWeirdURLs:
user = twExtract.extractUser(url,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_twextract_videoTweetExtract():
tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens)
assert utils.stripEndTCO(tweet["full_text"])==testVideoTweet_compare['text']
assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==1
video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testVideoTweet_compare['media_extended'][0]['thumbnail_url']
assert video["type"]=="video"
def test_twextract_mediaTweetExtract():
tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens)
assert utils.stripEndTCO(tweet["full_text"])==testMediaTweet_compare['text']
assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==1
video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testMediaTweet_compare['media_extended'][0]['thumbnail_url']
assert video["type"]=="photo"
def test_twextract_multimediaTweetExtract():
tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens)
assert utils.stripEndTCO(tweet["full_text"])[:94]==testMultiMediaTweet_compare['text'][:94]
assert 'extended_entities' in tweet
assert len(tweet['extended_entities']["media"])==3
video = tweet['extended_entities']["media"][0]
assert video["media_url_https"]==testMultiMediaTweet_compare["mediaURLs"][0]
assert video["type"]=="photo"
video = tweet['extended_entities']["media"][1]
assert video["media_url_https"]==testMultiMediaTweet_compare["mediaURLs"][1]
assert video["type"]=="photo"
def test_twextract_pollTweetExtract(): # basic check if poll data exists
tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens)
assert 'card' in tweet
assert tweet['card']['name']=="poll2choice_text_only"
def test_twextract_NSFW_TweetExtract():
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error

18
test_vx_misc.py Normal file
View File

@ -0,0 +1,18 @@
import twitfix, cache, twExtract
from vx_testdata import *
def test_calcSyndicationToken():
assert twExtract.calcSyndicationToken("1691389765483200513") == "43lnobuxzql"
def test_addToCache():
cache.clearCache()
twitfix.getTweetData(testTextTweet)
twitfix.getTweetData(testVideoTweet)
twitfix.getTweetData(testMediaTweet)
twitfix.getTweetData(testMultiMediaTweet)
#retrieve
compareDict(testTextTweet_compare,cache.getVnfFromLinkCache(testTextTweet))
compareDict(testVideoTweet_compare,cache.getVnfFromLinkCache(testVideoTweet))
compareDict(testMediaTweet_compare,cache.getVnfFromLinkCache(testMediaTweet))
compareDict(testMultiMediaTweet_compare,cache.getVnfFromLinkCache(testMultiMediaTweet))
cache.clearCache()

View File

@ -8,23 +8,27 @@ tests = {
"testQRTTweet":"https://twitter.com/pdxdylan/status/1611477137319514129",
"testQrtCeptionTweet":"https://twitter.com/CatherineShu/status/585253766271672320",
"testQrtVideoTweet":"https://twitter.com/pdxdylan/status/1674561759422578690",
"testNSFWTweet":"https://twitter.com/kuyacoy/status/1581185279376838657"
"testNSFWTweet":"https://twitter.com/kuyacoy/status/1581185279376838657",
"testPollTweet": "https://twitter.com/norm/status/651169346518056960",
"testMixedMediaTweet":"https://twitter.com/bigbeerfest/status/1760638922084741177",
}
def getVNFFromLink(link):
return twitfix.getTweetData(link)
with open('generated.txt', 'w',encoding='utf-8') as f:
f.write("# autogenerated from testgen.py\n")
for test in tests:
f.write(f"{test}=\"{tests[test]}\"\n")
f.write("\n")
for test in tests:
VNF = twitfix.link_to_vnf(tests[test])
del VNF['ttl']
VNF = getVNFFromLink(tests[test])
del VNF['likes']
del VNF['rts']
del VNF['hits']
del VNF['pfp']
del VNF['uploader']
del VNF['verified']
del VNF['screen_name']
del VNF['retweets']
del VNF['replies']
del VNF['user_screen_name']
del VNF['user_name']
del VNF['user_profile_image_url']
del VNF['communityNote']
# write in a format that can be copy-pasted into a python file, i.e testTextTweet={...
f.write(f"{test}_compare={VNF}\n")

View File

@ -1,27 +1,24 @@
from weakref import finalize
from flask import Flask, render_template, request, redirect, abort, Response, send_from_directory, url_for, send_file, make_response, jsonify
from flask_cors import CORS
import textwrap
import re
import os
import urllib.parse
import urllib.request
import combineImg
from datetime import date,datetime, timedelta
from io import BytesIO, StringIO
from io import BytesIO
import urllib
import msgs
import twExtract as twExtract
from configHandler import config
from cache import addVnfToLinkCache,getVnfFromLinkCache
from yt_dlp.utils import ExtractorError
import vxlogging as log
import zipfile
import html
from utils import getTweetIdFromUrl, pathregex
from vxApi import getApiResponse
from urllib.parse import urlparse
app = Flask(__name__)
CORS(app)
user_agent=""
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})")
generate_embed_user_agents = [
"facebookexternalhit/1.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.57 Safari/537.36",
@ -47,12 +44,52 @@ def isValidUserAgent(user_agent):
return True
return False
def getTweetIdFromUrl(url):
match = pathregex.search(url)
if match is not None:
return match.group(2)
else:
return None
def message(text):
return render_template(
'default.html',
message = text,
color = config['config']['color'],
appname = config['config']['appname'],
repo = config['config']['repo'],
url = config['config']['url'] )
def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
qrt = tweetData['qrt']
embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,tweetData['pollData'],msgs.genLikesDisplay(tweetData))
return render_template("image.html",
tweet=tweetData,
pic=[image],
host=config['config']['url'],
desc=embedDesc,
urlEncodedDesc=urllib.parse.quote(embedDesc),
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
appname=config['config']['appname']+appnameSuffix,
)
def renderVideoTweetEmbed(tweetData,mediaInfo,appnameSuffix=""):
qrt = tweetData['qrt']
embedDesc = msgs.formatEmbedDesc("Video",tweetData['text'],qrt,tweetData['pollData'],msgs.genLikesDisplay(tweetData))
return render_template("video.html",
tweet=tweetData,
media=mediaInfo,
host=config['config']['url'],
desc=embedDesc,
urlEncodedDesc=urllib.parse.quote(embedDesc),
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
appname=config['config']['appname']+appnameSuffix,
)
def renderTextTweetEmbed(tweetData,appnameSuffix=""):
qrt = tweetData['qrt']
embedDesc = msgs.formatEmbedDesc("Text",tweetData['text'],qrt,tweetData['pollData'],msgs.genLikesDisplay(tweetData))
return render_template("text.html",
tweet=tweetData,
host=config['config']['url'],
desc=embedDesc,
urlEncodedDesc=urllib.parse.quote(embedDesc),
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
appname=config['config']['appname']+appnameSuffix,
)
@app.route('/robots.txt')
def robots():
@ -60,12 +97,7 @@ def robots():
@app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly
def default():
global user_agent
user_agent = request.headers.get('user-agent')
if isValidUserAgent(user_agent):
return message("TwitFix is an attempt to fix twitter video embeds in discord! created by Robin Universe :)\n\n💖\n\nClick me to be redirected to the repo!")
else:
return redirect(config['config']['repo'], 301)
return redirect(config['config']['repo'], 301)
@app.route('/oembed.json') #oEmbed endpoint
def oembedend():
@ -76,268 +108,113 @@ def oembedend():
provName = request.args.get("provider",None)
return oEmbedGen(desc, user, link, ttype,providerName=provName)
def getTweetData(twitter_url):
cachedVNF = getVnfFromLinkCache(twitter_url)
if cachedVNF is not None:
return cachedVNF
try:
rawTweetData = twExtract.extractStatusV2Anon(twitter_url)
except:
rawTweetData = None
if rawTweetData is None:
rawTweetData = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
if 'error' in rawTweetData:
return None
if rawTweetData is None:
return None
tweetData = getApiResponse(rawTweetData)
if tweetData is None:
return None
addVnfToLinkCache(twitter_url,tweetData)
return tweetData
def determineEmbedTweet(tweetData):
# Determine which tweet, i.e main or QRT, to embed the media from.
# if there is no QRT, return the main tweet => default behavior
# if both don't have media, return the main tweet => embedding qrt text will be handled in the embed description
# if both have media, return the main tweet => priority is given to the main tweet's media
# if only the QRT has media, return the QRT => show the QRT's media, not the main tweet's
# if only the main tweet has media, return the main tweet => show the main tweet's media, embedding QRT text will be handled in the embed description
if tweetData['qrt'] is None:
return tweetData
if tweetData['qrt']['hasMedia'] and not tweetData['hasMedia']:
return tweetData['qrt']
return tweetData
@app.route('/<path:sub_path>') # Default endpoint used by everything
def twitfix(sub_path):
global user_agent
user_agent = request.headers.get('user-agent')
match = pathregex.search(sub_path)
if match is None:
abort(404)
twitter_url = f'https://twitter.com/i/status/{getTweetIdFromUrl(sub_path)}'
if request.url.endswith(".mp4") or request.url.endswith("%2Emp4"):
twitter_url = "https://twitter.com/" + sub_path
tweetData = getTweetData(twitter_url)
if tweetData is None:
return message(msgs.failedToScan)
qrt = None
if 'qrtURL' in tweetData and tweetData['qrtURL'] is not None:
qrt = getTweetData(tweetData['qrtURL'])
tweetData['qrt'] = qrt
if '?' in request.url:
requestUrlWithoutQuery = request.url.split("?")[0]
else:
requestUrlWithoutQuery = request.url
directEmbed=False
if requestUrlWithoutQuery.startswith("https://d.vx") or requestUrlWithoutQuery.endswith(".mp4") or requestUrlWithoutQuery.endswith(".png"):
directEmbed = True
# remove the .mp4 from the end of the URL
if requestUrlWithoutQuery.endswith(".mp4") or requestUrlWithoutQuery.endswith(".png"):
sub_path = sub_path[:-4]
embedIndex = -1
# if url ends with /1, /2, /3, or /4, we'll use that as the index
if sub_path[-2:] in ["/1","/2","/3","/4"]:
embedIndex = int(sub_path[-1])-1
sub_path = sub_path[:-2]
if "?" not in request.url:
clean = twitter_url[:-4]
if request.url.startswith("https://api.vx"): # Directly return the API response if the request is from the API
return tweetData
elif directEmbed: # direct embed
# direct embeds should always prioritize the main tweet, so don't check for qrt
# determine what type of media we're dealing with
if not tweetData['hasMedia'] and qrt is None:
return renderTextTweetEmbed(tweetData)
elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
return redirect(tweetData['combinedMediaUrl'], 302)
else:
clean = twitter_url
vnf,e = vnfFromCacheOrDL(clean)
if vnf is None:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
return make_cached_vnf_response(vnf,getTemplate("rawvideo.html",vnf,"",[],clean,"","","",""))
elif request.url.endswith(".txt") or request.url.endswith("%2Etxt"):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-4]
# this means we have mixed media or video, and we're only going to embed one
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
embedIndex = 0
media = tweetData['media_extended'][embedIndex]
if media['type'] == "image":
return redirect(media['url'], 302)
elif media['type'] == "video" or media['type'] == "gif":
return render_template("rawvideo.html",media=media)
else: # full embed
embedTweetData = determineEmbedTweet(tweetData)
if not embedTweetData['hasMedia']:
return renderTextTweetEmbed(tweetData)
elif embedTweetData['allSameType'] and embedTweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and embedTweetData['combinedMediaUrl'] != None:
return renderImageTweetEmbed(tweetData,embedTweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality")
else:
clean = twitter_url
vnf,e = vnfFromCacheOrDL(clean)
if vnf is None:
if e is not None:
return abort(500,"Failed to scan tweet: "+e)
return abort(500,"Failed to scan tweet")
return make_content_type_response(getTemplate("txt.html",vnf,vnf["description"],[],clean,"","","",""),"text/plain")
elif request.url.endswith(".zip") or request.url.endswith("%2Ezip"): # for certain types of archival software (i.e Hydrus)
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-4]
else:
clean = twitter_url
vnf,e = vnfFromCacheOrDL(clean)
if vnf is None:
if e is not None:
return abort(500,"Failed to scan tweet: "+e)
return abort(500,"Failed to scan tweet")
with app.app_context():
txtData = getTemplate("txt.html",vnf,vnf["description"],"",clean,"","","","")
txtIo = BytesIO()
txtIo.write(txtData.encode("utf-8"))
txtIo.seek(0)
zipIo = BytesIO()
with zipfile.ZipFile(zipIo, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.writestr("tweetInfo.txt", txtIo.read())
# todo: add images to zip
zipIo.seek(0)
return make_content_type_response(zipIo,"application/zip")
elif request.url.startswith("https://d.vx"): # Matches d.fx? Try to give the user a direct link
if isValidUserAgent(user_agent):
twitter_url = config['config']['url'] + "/"+sub_path
log.debug( "d.vx link shown to discord user-agent!")
if request.url.endswith(".mp4") and "?" not in request.url:
if "?" not in request.url:
clean = twitter_url[:-4]
else:
clean = twitter_url
# this means we have mixed media or video, and we're only going to embed one
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
embedIndex = 0
media = embedTweetData['media_extended'][embedIndex]
if len(embedTweetData["media_extended"]) > 1:
suffix = f' - Media {embedIndex+1}/{len(embedTweetData["media_extended"])}'
else:
clean = twitter_url
return redirect(clean+".mp4", 301)
else:
log.debug("Redirect to MP4 using d.fxtwitter.com")
return dir(sub_path)
elif request.url.endswith("/1") or request.url.endswith("/2") or request.url.endswith("/3") or request.url.endswith("/4") or request.url.endswith("%2F1") or request.url.endswith("%2F2") or request.url.endswith("%2F3") or request.url.endswith("%2F4"):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-2]
else:
clean = twitter_url
suffix = ''
if media['type'] == "image":
return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=suffix)
elif media['type'] == "video" or media['type'] == "gif":
return renderVideoTweetEmbed(tweetData,media,appnameSuffix=suffix)
image = ( int(request.url[-1]) - 1 )
return embed_video(clean, image)
elif request.url.startswith("https://api.vx"):
twitter_url = "https://twitter.com/" + sub_path
try:
try:
tweet = twExtract.extractStatusV2Anon(twitter_url)
except:
tweet = None
if tweet is None:
tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
if 'error' in tweet:
response = make_response(jsonify(tweet), 500)
response.headers['Content-Type'] = 'application/json'
response.cache_control.max_age = 3600
response.cache_control.public = True
return response
tweetL = tweet["legacy"]
if "user_result" in tweet["core"]:
userL = tweet["core"]["user_result"]["result"]["legacy"]
elif "user_results" in tweet["core"]:
userL = tweet["core"]["user_results"]["result"]["legacy"]
media=[]
media_extended=[]
hashtags=[]
communityNote=None
try:
if "birdwatch_pivot" in tweet:
communityNote=tweet["birdwatch_pivot"]["note"]["summary"]["text"]
except:
pass
if "extended_entities" in tweetL:
if "media" in tweetL["extended_entities"]:
tmedia=tweetL["extended_entities"]["media"]
for i in tmedia:
extendedInfo={}
if "video_info" in i:
# find the highest bitrate
best_bitrate = -1
besturl=""
for j in i["video_info"]["variants"]:
if j['content_type'] == "video/mp4" and '/hevc/' not in j["url"] and j['bitrate'] > best_bitrate:
besturl = j['url']
best_bitrate = j['bitrate']
media.append(besturl)
extendedInfo["url"] = besturl
extendedInfo["type"] = "video"
if (i["type"] == "animated_gif"):
extendedInfo["type"] = "gif"
altText = None
extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]}
if "ext_alt_text" in i:
altText=i["ext_alt_text"]
if "duration_millis" in i["video_info"]:
extendedInfo["duration_millis"] = i["video_info"]["duration_millis"]
else:
extendedInfo["duration_millis"] = 0
extendedInfo["thumbnail_url"] = i["media_url_https"]
extendedInfo["altText"] = altText
media_extended.append(extendedInfo)
else:
media.append(i["media_url_https"])
extendedInfo["url"] = i["media_url_https"]
altText=None
if "ext_alt_text" in i:
altText=i["ext_alt_text"]
extendedInfo["altText"] = altText
extendedInfo["type"] = "image"
extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]}
extendedInfo["thumbnail_url"] = i["media_url_https"]
media_extended.append(extendedInfo)
if "hashtags" in tweetL["entities"]:
for i in tweetL["entities"]["hashtags"]:
hashtags.append(i["text"])
include_txt = request.args.get("include_txt", "false")
include_zip = request.args.get("include_zip", "false") # for certain types of archival software (i.e Hydrus)
if include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0):
txturl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".txt"
media.append(txturl)
media_extended.append({"url":txturl,"type":"txt"})
if include_zip == "true" or (include_zip == "ifnomedia" and len(media)==0):
zipurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".zip"
media.append(zipurl)
media_extended.append({"url":zipurl,"type":"zip"})
qrtURL = None
if 'quoted_status_id_str' in tweetL:
qrtURL = "https://twitter.com/i/status/" + tweetL['quoted_status_id_str']
if 'possibly_sensitive' not in tweetL:
tweetL['possibly_sensitive'] = False
twText = html.unescape(tweetL["full_text"])
if 'entities' in tweetL and 'urls' in tweetL['entities']:
for eurl in tweetL['entities']['urls']:
if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"):
twText = twText.replace(eurl["url"], "")
else:
twText = twText.replace(eurl["url"],eurl["expanded_url"])
apiObject = {
"text": twText,
"likes": tweetL["favorite_count"],
"retweets": tweetL["retweet_count"],
"replies": tweetL["reply_count"],
"date": tweetL["created_at"],
"user_screen_name": html.unescape(userL["screen_name"]),
"user_name": userL["name"],
"user_profile_image_url": userL["profile_image_url_https"],
"tweetURL": "https://twitter.com/"+userL["screen_name"]+"/status/"+tweet["rest_id"],
"tweetID": tweet["rest_id"],
"conversationID": tweetL["conversation_id_str"],
"mediaURLs": media,
"media_extended": media_extended,
"possibly_sensitive": tweetL["possibly_sensitive"],
"hashtags": hashtags,
"qrtURL": qrtURL,
"communityNote": communityNote
}
try:
apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp())
except:
pass
if tweet is None:
log.error("API Get failed: " + twitter_url + " (Tweet null)")
abort(500, '{"message": "Failed to extract tweet (Twitter API error)"}')
log.success("API Get success")
return apiObject
except Exception as e:
log.error("API Get failed: " + twitter_url + " " + log.get_exception_traceback_str(e))
abort(500, '{"message": "Failed to extract tweet (Processing error)"}')
if match is not None:
twitter_url = sub_path
if match.start() == 0:
twitter_url = "https://twitter.com/" + sub_path
else:
# URL normalization messes up the URL, so we have to fix it
if sub_path.startswith("https:/") and not sub_path.startswith("https://"):
twitter_url = sub_path.replace("https:/", "https://", 1)
elif sub_path.startswith("http:/") and not sub_path.startswith("http://"):
twitter_url = sub_path.replace("http:/", "http://", 1)
if isValidUserAgent(user_agent):
res = embedCombined(twitter_url)
return res
else:
log.debug("Redirect to " + twitter_url)
return redirect(twitter_url, 301)
else:
return message("This doesn't appear to be a twitter URL")
@app.route('/dir/<path:sub_path>') # Try to return a direct link to the MP4 on twitters servers
def dir(sub_path):
global user_agent
user_agent = request.headers.get('user-agent')
url = sub_path
match = pathregex.search(url)
if match is not None:
twitter_url = url
if match.start() == 0:
twitter_url = "https://twitter.com/" + url
if isValidUserAgent(user_agent):
res = embed_video(twitter_url)
return res
else:
log.debug("Redirect to direct MP4 URL")
return direct_video(twitter_url)
else:
return redirect(url, 301)
return message(msgs.failedToScan)
@app.route('/favicon.ico')
def favicon(): # pragma: no cover
@ -372,444 +249,6 @@ def rendercombined():
imgIo.seek(0)
return send_file(imgIo, mimetype='image/jpeg',max_age=86400)
def upgradeVNF(vnf):
# Makes sure any VNF object passed through this has proper fields if they're added in later versions
if 'verified' not in vnf:
vnf['verified']=False
if 'size' not in vnf:
if vnf['type'] == 'Video':
vnf['size']={'width':720,'height':480}
else:
vnf['size']={}
if 'qrtURL' not in vnf:
if vnf['qrt'] == {}:
vnf['qrtURL'] = None
else: #
vnf['qrtURL'] = f"https://twitter.com/{vnf['qrt']['screen_name']}/status/{vnf['qrt']['id']}"
if 'isGif' not in vnf:
vnf['isGif'] = False
return vnf
def getDefaultTTL(): # TTL for deleting items from the database
return datetime.today().replace(microsecond=0) + timedelta(days=1)
def secondsUntilTTL(ttl):
untilTTL = ttl - datetime.today().replace(microsecond=0)
return untilTTL.total_seconds()
def make_content_type_response(response, content_type):
resp = make_response(response)
resp.headers['Content-Type'] = content_type
return resp
def make_cached_vnf_response(vnf,response):
return response
try:
if 'ttl' not in vnf or vnf['ttl'] == None or secondsUntilTTL(vnf['ttl']) <= 0:
return response
resp = make_response(response)
resp.cache_control.max_age = secondsUntilTTL(vnf['ttl'])
resp.cache_control.public = True
return resp
except Exception as e:
log.error("Error making cached response: " + str(e))
return response
def vnfFromCacheOrDL(video_link):
cached_vnf = getVnfFromLinkCache(video_link)
if cached_vnf == None:
try:
vnf = link_to_vnf(video_link)
addVnfToLinkCache(video_link, vnf)
log.success("VNF Get success")
return vnf,None
except (ExtractorError, twExtract.TwExtractError) as exErr:
if 'HTTP Error 404' in exErr.msg or 'No status found with that ID' in exErr.msg:
exErr.msg=msgs.tweetNotFound
elif 'suspended' in exErr.msg:
exErr.msg=msgs.tweetSuspended
else:
exErr.msg=msgs.unknownError
log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(exErr))
return None,exErr.msg
except Exception as e:
log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(e))
return None,None
else:
return upgradeVNF(cached_vnf),None
def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return redirect(vnf['url'], 301)
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def direct_video_link(video_link): # Just get a redirect to a MP4 link from any tweet link
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return vnf['url']
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def embed_video(video_link, image=0): # Return Embed from any tweet link
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return embed(video_link, vnf, image)
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def tweetInfo(url, tweet="", desc="", thumb="", uploader="", screen_name="", pfp="", tweetType="", images="", hits=0, likes=0, rts=0, time="", qrtURL="", nsfw=False,ttl=None,verified=False,size={},poll=None,isGif=False): # Return a dict of video info with default values
if (ttl==None):
ttl = getDefaultTTL()
vnf = {
"tweet" : tweet,
"url" : url,
"description" : desc,
"thumbnail" : thumb,
"uploader" : uploader,
"screen_name" : screen_name,
"pfp" : pfp,
"type" : tweetType,
"images" : images,
"hits" : hits,
"likes" : likes,
"rts" : rts,
"time" : time,
"qrtURL" : qrtURL,
"nsfw" : nsfw,
"ttl" : ttl,
"verified" : verified,
"size" : size,
"poll" : poll,
"isGif" : isGif,
"tweetId" : int(getTweetIdFromUrl(tweet))
}
if (poll is None):
del vnf['poll']
return vnf
def link_to_vnf_from_tweet_data(tweet,video_link):
imgs = ["","","","", ""]
log.debug("Tweet Type: " + tweetType(tweet))
isGif=False
# Check to see if tweet has a video, if not, make the url passed to the VNF the first t.co link in the tweet
if tweetType(tweet) == "Video":
media=tweet['extended_entities']['media'][0]
if media['video_info']['variants']:
best_bitrate = -1
thumb = media['media_url']
if 'original_info' in media:
size=media["original_info"]
elif 'sizes' in media and ('large' in media["sizes"] or 'medium' in media["sizes"] or 'small' in media["sizes"] or 'thumb' in media["sizes"]):
possibleSizes=['large','medium','small','thumb']
for p in possibleSizes:
if p in media["sizes"]:
size={'width':media["sizes"][p]['w'],'height':media["sizes"][p]['h']}
break
else:
size={'width':720,'height':480}
for video in media['video_info']['variants']:
if video['content_type'] == "video/mp4" and '/hevc/' not in video["url"] and video['bitrate'] > best_bitrate:
url = video['url']
best_bitrate = video['bitrate']
elif tweetType(tweet) == "Text":
url = ""
thumb = ""
size = {}
else:
imgs = ["","","","", ""]
i = 0
for media in tweet['extended_entities']['media']:
imgs[i] = media['media_url_https']
i = i + 1
imgs[4] = str(i)
url = ""
images= imgs
thumb = tweet['extended_entities']['media'][0]['media_url_https']
size = {}
if 'extended_entities' in tweet and 'media' in tweet['extended_entities'] and tweet['extended_entities']['media'][0]['type'] == 'animated_gif':
isGif=True
qrtURL = None
if 'quoted_status_permalink' in tweet:
qrtURL = tweet['quoted_status_permalink']['expanded']
elif 'quoted_status_id_str' in tweet:
qrtURL = "https://twitter.com/i/status/" + tweet['quoted_status_id_str']
text = tweet['full_text']
if 'possibly_sensitive' in tweet:
nsfw = tweet['possibly_sensitive']
else:
nsfw = False
if 'entities' in tweet and 'urls' in tweet['entities']:
for eurl in tweet['entities']['urls']:
if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"):
text = text.replace(eurl["url"], "")
else:
text = text.replace(eurl["url"],eurl["expanded_url"])
ttl = None #default
try:
if 'card' in tweet and tweet['card']['name'].startswith('poll'):
poll=getPollObject(tweet['card'])
if tweet['card']['binding_values']['counts_are_final']['boolean_value'] == False:
ttl = datetime.today().replace(microsecond=0) + timedelta(minutes=1)
else:
poll=None
except:
poll=None
vnf = tweetInfo(
url,
video_link,
text, thumb,
tweet['user']['name'],
tweet['user']['screen_name'],
tweet['user']['profile_image_url'],
tweetType(tweet),
likes=tweet['favorite_count'],
rts=tweet['retweet_count'],
time=tweet['created_at'],
qrtURL=qrtURL,
images=imgs,
nsfw=nsfw,
verified=tweet['user']['verified'],
size=size,
poll=poll,
ttl=ttl,
isGif=isGif
)
return vnf
def link_to_vnf_from_unofficial_api(video_link):
tweet=None
log.info("Attempting to download tweet info: "+video_link)
tweet = twExtract.extractStatus(video_link,workaroundTokens=config['config']['workaroundTokens'].split(','))
log.success("Unofficial API Success")
if "extended_entities" not in tweet:
# check if any entities with urls ending in /video/XXX or /photo/XXX exist
if "entities" in tweet and "urls" in tweet["entities"]:
for url in tweet["entities"]["urls"]:
if "/video/" in url["expanded_url"] or "/photo/" in url["expanded_url"]:
log.info("Extra tweet info found in entities: "+video_link+" -> "+url["expanded_url"])
subTweet = twExtract.extractStatus(url["expanded_url"],workaroundTokens=config['config']['workaroundTokens'].split(','))
if "extended_entities" in subTweet:
tweet["extended_entities"] = subTweet["extended_entities"]
break
return link_to_vnf_from_tweet_data(tweet,video_link)
def link_to_vnf(video_link): # Return a VideoInfo object or die trying
return link_to_vnf_from_unofficial_api(video_link)
def message(text):
return render_template(
'default.html',
message = text,
color = config['config']['color'],
appname = config['config']['appname'],
repo = config['config']['repo'],
url = config['config']['url'] )
def getTemplate(template,vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix="",embedVNF=None):
if (embedVNF is None):
embedVNF = vnf
if ('width' in embedVNF['size'] and 'height' in embedVNF['size']):
embedVNF['size']['width'] = min(embedVNF['size']['width'],2000)
embedVNF['size']['height'] = min(embedVNF['size']['height'],2000)
return render_template(
template,
likes = vnf['likes'],
rts = vnf['rts'],
time = vnf['time'],
screenName = vnf['screen_name'],
vidlink = embedVNF['url'],
userLink = f"https://twitter.com/{vnf['screen_name']}",
pfp = vnf['pfp'],
vidurl = embedVNF['url'],
desc = desc,
pic = images,
user = vnf['uploader'],
video_link = vnf,
color = color,
appname = config['config']['appname'] + appNameSuffix,
repo = config['config']['repo'],
url = config['config']['url'],
urlDesc = urlDesc,
urlUser = urlUser,
urlLink = urlLink,
urlUserLink= urllib.parse.quote(f"https://twitter.com/{vnf['screen_name']}"),
tweetLink = vnf['tweet'],
videoSize = embedVNF['size'] )
def embed(video_link, vnf, image):
log.info("Embedding " + vnf['type'] + ": " + video_link)
desc = re.sub(r' https:\/\/t\.co\/\S+(?=\s|$)', '', vnf['description'])
urlUser = urllib.parse.quote(vnf['uploader'])
urlDesc = urllib.parse.quote(desc)
urlLink = urllib.parse.quote(video_link)
likeDisplay = msgs.genLikesDisplay(vnf)
if 'poll' in vnf:
pollDisplay= msgs.genPollDisplay(vnf['poll'])
else:
pollDisplay=""
qrt=None
if vnf['qrtURL'] is not None:
qrt,e=vnfFromCacheOrDL(vnf['qrtURL'])
if qrt is not None:
desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay)
else:
desc=msgs.formatEmbedDesc(vnf['type'],desc,None,pollDisplay,likeDisplay)
embedVNF=None
appNamePost = ""
if vnf['type'] == "Text": # Change the template based on tweet type
template = 'text.html'
if qrt is not None and qrt['type'] != "Text":
embedVNF=qrt
if qrt['type'] == "Image":
if embedVNF['images'][4]!="1":
appNamePost = " - Image " + str(image+1) + " of " + str(vnf['images'][4])
image = embedVNF['images'][image]
template = 'image.html'
elif qrt['type'] == "Video" or qrt['type'] == "":
urlDesc = urllib.parse.quote(desc)
template = 'video.html'
if vnf['type'] == "Image":
if vnf['images'][4]!="1":
appNamePost = " - Image " + str(image+1) + "/" + str(vnf['images'][4])
image = vnf['images'][image]
template = 'image.html'
if vnf['type'] == "Video":
if vnf['isGif'] == True and config['config']['gifConvertAPI'] != "" and config['config']['gifConvertAPI'] != "none":
vnf['url'] = f"{config['config']['gifConvertAPI']}/convert.mp4?url={vnf['url']}"
appNamePost = " - GIF"
urlDesc = urllib.parse.quote(desc)
template = 'video.html'
if vnf['type'] == "":
urlDesc = urllib.parse.quote(desc)
template = 'video.html'
color = "#7FFFD4" # Green
if vnf['nsfw'] == True:
color = "#800020" # Red
return make_cached_vnf_response(vnf,getTemplate(template,vnf,desc,[image],video_link,color,urlDesc,urlUser,urlLink,appNamePost,embedVNF))
def embedCombined(video_link):
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return make_cached_vnf_response(vnf,embedCombinedVnf(video_link, vnf))
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def embedCombinedVnf(video_link,vnf):
qrt=None
if vnf['qrtURL'] is not None:
qrt,e=vnfFromCacheOrDL(vnf['qrtURL'])
if vnf['type'] != "Image" and vnf['type'] != "Video" and qrt is not None and qrt['type'] == "Image":
if qrt['images'][4]!="1":
vnf['images'] = qrt['images']
vnf['type'] = "Image"
if vnf['type'] != "Image" or vnf['images'][4] == "1":
return embed(video_link, vnf, 0)
desc = re.sub(r' http.*t\.co\S+', '', vnf['description'])
urlUser = urllib.parse.quote(vnf['uploader'])
urlDesc = urllib.parse.quote(desc)
urlLink = urllib.parse.quote(video_link)
likeDisplay = msgs.genLikesDisplay(vnf)
if 'poll' in vnf:
pollDisplay= msgs.genPollDisplay(vnf['poll'])
else:
pollDisplay=""
if qrt is not None:
desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay)
suffix=""
#if 'Discord' in user_agent:
# images = []
# for i in range(0,int(vnf['images'][4])):
# images.append(vnf['images'][i])
#else:
host = config['config']['url']
image = f"{host}/rendercombined.jpg?imgs="
for i in range(0,int(vnf['images'][4])):
image = image + vnf['images'][i] + ","
image = image[:-1] # Remove last comma
images=[image]
suffix=" - View original tweet for full quality"
color = "#7FFFD4" # Green
if vnf['nsfw'] == True:
color = "#800020" # Red
return make_cached_vnf_response(vnf,getTemplate('image.html',vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix=suffix))
def getPollObject(card):
poll={"total_votes":0,"choices":[]}
choiceCount=0
if (card["name"]=="poll2choice_text_only"):
choiceCount=2
elif (card["name"]=="poll3choice_text_only"):
choiceCount=3
elif (card["name"]=="poll4choice_text_only"):
choiceCount=4
for i in range(0,choiceCount):
choice = {"text":card["binding_values"][f"choice{i+1}_label"]["string_value"],"votes":int(card["binding_values"][f"choice{i+1}_count"]["string_value"])}
poll["total_votes"]+=choice["votes"]
poll["choices"].append(choice)
# update each choice with a percentage
for choice in poll["choices"]:
choice["percent"] = round((choice["votes"]/poll["total_votes"])*100,1)
return poll
def tweetType(tweet): # Are we dealing with a Video, Image, or Text tweet?
if 'extended_entities' in tweet:
if 'video_info' in tweet['extended_entities']['media'][0]:
out = "Video"
else:
out = "Image"
else:
out = "Text"
return out
def oEmbedGen(description, user, video_link, ttype,providerName=None):
if providerName == None:
providerName = config['config']['appname']

19
utils.py Normal file
View File

@ -0,0 +1,19 @@
import re
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})")
endTCOregex = re.compile("(^.*?) +https:\/\/t.co\/.*?$")
def getTweetIdFromUrl(url):
match = pathregex.search(url)
if match is not None:
return match.group(2)
else:
return None
def stripEndTCO(text):
# remove t.co links at the end of a string
match = endTCOregex.search(text)
if match is not None:
return match.group(1)
else:
return text

169
vxApi.py Normal file
View File

@ -0,0 +1,169 @@
import html
from datetime import datetime
from configHandler import config
from utils import stripEndTCO
def getApiResponse(tweet,include_txt=False,include_zip=False):
tweetL = tweet["legacy"]
if "user_result" in tweet["core"]:
userL = tweet["core"]["user_result"]["result"]["legacy"]
elif "user_results" in tweet["core"]:
userL = tweet["core"]["user_results"]["result"]["legacy"]
media=[]
media_extended=[]
hashtags=[]
communityNote=None
try:
if "birdwatch_pivot" in tweet:
communityNote=tweet["birdwatch_pivot"]["note"]["summary"]["text"]
except:
pass
if "extended_entities" in tweetL:
if "media" in tweetL["extended_entities"]:
tmedia=tweetL["extended_entities"]["media"]
for i in tmedia:
extendedInfo={}
if "video_info" in i:
# find the highest bitrate
best_bitrate = -1
besturl=""
for j in i["video_info"]["variants"]:
if j['content_type'] == "video/mp4" and '/hevc/' not in j["url"] and j['bitrate'] > best_bitrate:
besturl = j['url']
best_bitrate = j['bitrate']
media.append(besturl)
extendedInfo["url"] = besturl
extendedInfo["type"] = "video"
if (i["type"] == "animated_gif"):
extendedInfo["type"] = "gif"
altText = None
extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]}
if "ext_alt_text" in i:
altText=i["ext_alt_text"]
if "duration_millis" in i["video_info"]:
extendedInfo["duration_millis"] = i["video_info"]["duration_millis"]
else:
extendedInfo["duration_millis"] = 0
extendedInfo["thumbnail_url"] = i["media_url_https"]
extendedInfo["altText"] = altText
media_extended.append(extendedInfo)
else:
media.append(i["media_url_https"])
extendedInfo["url"] = i["media_url_https"]
altText=None
if "ext_alt_text" in i:
altText=i["ext_alt_text"]
extendedInfo["altText"] = altText
extendedInfo["type"] = "image"
extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]}
extendedInfo["thumbnail_url"] = i["media_url_https"]
media_extended.append(extendedInfo)
if "hashtags" in tweetL["entities"]:
for i in tweetL["entities"]["hashtags"]:
hashtags.append(i["text"])
#include_txt = request.args.get("include_txt", "false")
#include_zip = request.args.get("include_zip", "false") # for certain types of archival software (i.e Hydrus)
if include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0):
txturl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".txt"
media.append(txturl)
media_extended.append({"url":txturl,"type":"txt"})
if include_zip == "true" or (include_zip == "ifnomedia" and len(media)==0):
zipurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".zip"
media.append(zipurl)
media_extended.append({"url":zipurl,"type":"zip"})
qrtURL = None
if 'quoted_status_id_str' in tweetL:
qrtURL = "https://twitter.com/i/status/" + tweetL['quoted_status_id_str']
if 'possibly_sensitive' not in tweetL:
tweetL['possibly_sensitive'] = False
twText = html.unescape(tweetL["full_text"])
if 'entities' in tweetL and 'urls' in tweetL['entities']:
for eurl in tweetL['entities']['urls']:
if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"):
twText = twText.replace(eurl["url"], "")
else:
twText = twText.replace(eurl["url"],eurl["expanded_url"])
twText = stripEndTCO(twText)
# check if all extended media are the same type
sameMedia = False
if len(media_extended) > 1:
sameMedia = True
for i in media_extended:
if i["type"] != media_extended[0]["type"]:
sameMedia = False
break
else:
sameMedia = True
combinedMediaUrl = None
if len(media_extended) > 0 and sameMedia and media_extended[0]["type"] == "image" and len(media) > 1:
host=config['config']['url']
combinedMediaUrl = f'{host}/rendercombined.jpg?imgs='
for i in media:
combinedMediaUrl += i + ","
combinedMediaUrl = combinedMediaUrl[:-1]
pollData = None
if 'card' in tweet and 'legacy' in tweet['card'] and tweet['card']['legacy']['name'].startswith("poll"):
cardName = tweet['card']['legacy']['name']
pollData={} # format: {"options":["name":"Option 1 Name","votes":5,"percent":50]}
pollData["options"] = []
totalVotes = 0
bindingValues = tweet['card']['legacy']['binding_values']
pollValues = {}
for i in bindingValues:
key = i["key"]
value = i["value"]
etype = value["type"]
if etype == "STRING":
pollValues[key] = value["string_value"]
elif etype == "BOOLEAN":
pollValues[key] = value["boolean_value"]
for i in range(1,5):
if f"choice{i}_label" in pollValues:
option = {}
option["name"] = pollValues[f"choice{i}_label"]
option["votes"] = int(pollValues[f"choice{i}_count"])
totalVotes += option["votes"]
pollData["options"].append(option)
for i in pollData["options"]:
i["percent"] = round((i["votes"]/totalVotes)*100,2)
apiObject = {
"text": twText,
"likes": tweetL["favorite_count"],
"retweets": tweetL["retweet_count"],
"replies": tweetL["reply_count"],
"date": tweetL["created_at"],
"user_screen_name": html.unescape(userL["screen_name"]),
"user_name": userL["name"],
"user_profile_image_url": userL["profile_image_url_https"],
"tweetURL": "https://twitter.com/"+userL["screen_name"]+"/status/"+tweet["rest_id"],
"tweetID": tweet["rest_id"],
"conversationID": tweetL["conversation_id_str"],
"mediaURLs": media,
"media_extended": media_extended,
"possibly_sensitive": tweetL["possibly_sensitive"],
"hashtags": hashtags,
"qrtURL": qrtURL,
"communityNote": communityNote,
"allSameType": sameMedia,
"hasMedia": len(media) > 0,
"combinedMediaUrl": combinedMediaUrl,
"pollData": pollData,
}
try:
apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp())
except:
pass
return apiObject

41
vx_testdata.py Normal file
View File

@ -0,0 +1,41 @@
import os
# autogenerated from testgen.py
testTextTweet="https://twitter.com/jack/status/20"
testVideoTweet="https://twitter.com/pdxdylan/status/1540398733669666818"
testMediaTweet="https://twitter.com/pdxdylan/status/1534672932106035200"
testMultiMediaTweet="https://twitter.com/pdxdylan/status/1532006436703715331"
testQRTTweet="https://twitter.com/pdxdylan/status/1611477137319514129"
testQrtCeptionTweet="https://twitter.com/CatherineShu/status/585253766271672320"
testQrtVideoTweet="https://twitter.com/pdxdylan/status/1674561759422578690"
testNSFWTweet="https://twitter.com/kuyacoy/status/1581185279376838657"
testPollTweet="https://twitter.com/norm/status/651169346518056960"
testMixedMediaTweet="https://twitter.com/bigbeerfest/status/1760638922084741177"
testTextTweet_compare={'text': 'just setting up my twttr', 'date': 'Tue Mar 21 20:50:14 +0000 2006', 'tweetURL': 'https://twitter.com/jack/status/20', 'tweetID': '20', 'conversationID': '20', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1142974214}
testVideoTweet_compare={'text': 'TikTok embeds on Discord/Telegram bait you with a fake play button, but to see the actual video you have to go to their website.\nAs a request from a friend, I made it so that if you add "vx" before "tiktok" on any link, it fixes that. https://t.co/QYpiVXUIrW', 'date': 'Fri Jun 24 18:17:31 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1540398733669666818', 'tweetID': '1540398733669666818', 'conversationID': '1540398733669666818', 'mediaURLs': ['https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4?tag=12'], 'media_extended': [{'url': 'https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4?tag=12', 'type': 'video', 'size': {'width': 762, 'height': 528}, 'duration_millis': 13650, 'thumbnail_url': 'https://pbs.twimg.com/ext_tw_video_thumb/1540396699037929472/pu/img/l187Z6B9AHHxUKPV.jpg', 'altText': None}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1656094651}
testMediaTweet_compare={'text': 'oh.', 'date': 'Wed Jun 08 23:05:14 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1534672932106035200', 'tweetID': '1534672932106035200', 'conversationID': '1534672673422381057', 'mediaURLs': ['https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png', 'altText': None, 'type': 'image', 'size': {'width': 927, 'height': 534}, 'thumbnail_url': 'https://pbs.twimg.com/media/FUxAt5LWUAMol0N.png'}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1654729514}
testMultiMediaTweet_compare={'text': 'Released #Retro64 1.0.9. Besides a lot of internal bug-fixes, this adds quicksand blocks, fixes the rendering for the castle stairs block, and adds a new model, Sonic! \nhttps://github.com/Retro64Mod/Retro64Mod/releases/tag/1.18.2-1.0.9 https://t.co/CWZaw4hzyg', 'date': 'Wed Jun 01 14:29:32 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1532006436703715331', 'tweetID': '1532006436703715331', 'conversationID': '1532006436703715331', 'mediaURLs': ['https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png', 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png', 'altText': None, 'type': 'image', 'size': {'width': 507, 'height': 507}, 'thumbnail_url': 'https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png'}, {'url': 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png', 'altText': None, 'type': 'image', 'size': {'width': 396, 'height': 431}, 'thumbnail_url': 'https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png'}, {'url': 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png', 'altText': None, 'type': 'image', 'size': {'width': 399, 'height': 341}, 'thumbnail_url': 'https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png'}], 'possibly_sensitive': False, 'hashtags': ['Retro64'], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': 'https://vxtwitter.com/rendercombined.jpg?imgs=https://pbs.twimg.com/media/FULF9oxXwAMDI-C.png,https://pbs.twimg.com/media/FULGaHkWYAIBV5U.png,https://pbs.twimg.com/media/FULGiZnWQAMBRWl.png', 'pollData': None, 'date_epoch': 1654093772}
testQRTTweet_compare={'text': "vxTwitter has gotten a *ton* of usage recently, so I'd appreciate a donation to keep things running!\n", 'date': 'Fri Jan 06 21:37:43 +0000 2023', 'tweetURL': 'https://twitter.com/pdxdylan/status/1611477137319514129', 'tweetID': '1611477137319514129', 'conversationID': '1611476665821003776', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': 'https://twitter.com/i/status/1518309187515781125', 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1673041063}
testQrtCeptionTweet_compare={'text': 'Testing retweetception ', 'date': 'Tue Apr 07 01:32:26 +0000 2015', 'tweetURL': 'https://twitter.com/CatherineShu/status/585253766271672320', 'tweetID': '585253766271672320', 'conversationID': '585253766271672320', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': 'https://twitter.com/i/status/585253161260216320', 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1428370346}
testQrtVideoTweet_compare={'text': 'good', 'date': 'Thu Jun 29 23:33:29 +0000 2023', 'tweetURL': 'https://twitter.com/pdxdylan/status/1674561759422578690', 'tweetID': '1674561759422578690', 'conversationID': '1674561759422578690', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': 'https://twitter.com/i/status/1674197531301904388', 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1688081609}
testNSFWTweet_compare={'text': "ngl, I'm scared on finding out the cute Sprigatito's final evolution..\n\nso i had a bot generate it for me.... and I'm forever scarred https://t.co/itMay87vcS", 'date': 'Sat Oct 15 07:28:42 +0000 2022', 'tweetURL': 'https://twitter.com/kuyacoy/status/1581185279376838657', 'tweetID': '1581185279376838657', 'conversationID': '1581185279376838657', 'mediaURLs': ['https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg', 'altText': None, 'type': 'image', 'size': {'width': 760, 'height': 926}, 'thumbnail_url': 'https://pbs.twimg.com/media/FfF_gKwXgAIpnpD.jpg'}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1665818922}
testPollTweet_compare={'text': 'I know when that hotline bling, that can only:', 'date': 'Mon Oct 05 22:57:25 +0000 2015', 'tweetURL': 'https://twitter.com/norm/status/651169346518056960', 'tweetID': '651169346518056960', 'conversationID': '651169346518056960', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': {'options': [{'name': 'Mean one thing', 'votes': 124875, 'percent': 78.82}, {'name': 'Mean multiple things', 'votes': 33554, 'percent': 21.18}]}, 'date_epoch': 1444085845}
testMixedMediaTweet_compare={'text': 'Some of us here are definitely big nerds about beer, and could talk your ear off about it for days on end, but some of us are just "beer is nice"', 'date': 'Thu Feb 22 12:13:24 +0000 2024', 'tweetURL': 'https://twitter.com/bigbeerfest/status/1760638922084741177', 'tweetID': '1760638922084741177', 'conversationID': '1760638922084741177', 'mediaURLs': ['https://pbs.twimg.com/media/GG8LwfuWoAANKhs.jpg', 'https://video.twimg.com/tweet_video/GG8LwqWX0AAZch0.mp4'], 'media_extended': [{'url': 'https://pbs.twimg.com/media/GG8LwfuWoAANKhs.jpg', 'altText': None, 'type': 'image', 'size': {'width': 858, 'height': 960}, 'thumbnail_url': 'https://pbs.twimg.com/media/GG8LwfuWoAANKhs.jpg'}, {'url': 'https://video.twimg.com/tweet_video/GG8LwqWX0AAZch0.mp4', 'type': 'gif', 'size': {'width': 500, 'height': 500}, 'duration_millis': 0, 'thumbnail_url': 'https://pbs.twimg.com/tweet_video_thumb/GG8LwqWX0AAZch0.jpg', 'altText': None}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': False, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': None, 'date_epoch': 1708604004}
testUser="https://twitter.com/jack"
testUserID = "https://twitter.com/i/user/12"
testUserWeirdURLs=["https://twitter.com/jack?lang=en","https://twitter.com/jack/with_replies","https://twitter.com/jack/media","https://twitter.com/jack/likes","https://twitter.com/jack/with_replies?lang=en","https://twitter.com/jack/media?lang=en","https://twitter.com/jack/likes?lang=en","https://twitter.com/jack/"]
testTextTweet="https://twitter.com/jack/status/20"
tokens=os.getenv("VXTWITTER_WORKAROUND_TOKENS",None).split(',')
def compareDict(original,compare):
for key in original:
assert key in compare
if type(compare[key]) is not dict:
if (key == 'verified' or key== 'time') and compare[key]!=original[key]:
continue # does not match as test data was from before verification changes
assert compare[key]==original[key]
else:
compareDict(original[key],compare[key])