Changed how twExtract recieves tokens

This commit is contained in:
Dylan 2023-08-15 10:09:04 +01:00
parent 25ebf7d9d3
commit ff8bc4456c
3 changed files with 32 additions and 31 deletions

View File

@ -32,6 +32,8 @@ testMultiMedia_compare={'tweet': 'https://twitter.com/Twitter/status/11541723245
testPoll_comparePoll={"name":"poll2choice_text_only","binding_values":{"choice1_label":{"type":"STRING","string_value":"Mean one thing"},"choice2_label":{"type":"STRING","string_value":"Mean multiple things"},"end_datetime_utc":{"type":"STRING","string_value":"2015-10-06T22:57:24Z"},"counts_are_final":{"type":"BOOLEAN","boolean_value":True},"choice2_count":{"type":"STRING","string_value":"33554"},"choice1_count":{"type":"STRING","string_value":"124875"},"last_updated_datetime_utc":{"type":"STRING","string_value":"2015-10-06T22:57:31Z"},"duration_minutes":{"type":"STRING","string_value":"1440"}}}
testPoll_comparePollVNF={'total_votes': 158429, 'choices': [{'text': 'Mean one thing', 'votes': 124875, 'percent': 78.8}, {'text': 'Mean multiple things', 'votes': 33554, 'percent': 21.2}]}
tokens=os.getenv("VXTWITTER_WORKAROUND_TOKENS",None).split(',')
def compareDict(original,compare):
for key in original:
assert key in compare
@ -44,35 +46,35 @@ def compareDict(original,compare):
## Tweet retrieve tests ##
def test_textTweetExtract():
tweet = twExtract.extractStatus(testTextTweet)
tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens)
assert tweet["full_text"]==textVNF_compare['description']
assert tweet["user"]["screen_name"]=="jack"
assert 'extended_entities' not in tweet
def test_extractV2(): # remove this when v2 is default
tweet = twExtract.extractStatusV2(testTextTweet)
tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens)
def test_UserExtract():
user = twExtract.extractUser(testUser)
user = twExtract.extractUser(testUser,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_UserExtractID():
user = twExtract.extractUser(testUserID)
user = twExtract.extractUser(testUserID,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_UserExtractWeirdURLs():
for url in testUserWeirdURLs:
user = twExtract.extractUser(url)
user = twExtract.extractUser(url,workaroundTokens=tokens)
assert user["screen_name"]=="jack"
assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_videoTweetExtract():
tweet = twExtract.extractStatus(testVideoTweet)
tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens)
assert tweet["full_text"]==videoVNF_compare['description']
assert tweet["user"]["screen_name"]==twitterAccountName
assert 'extended_entities' in tweet
@ -83,7 +85,7 @@ def test_videoTweetExtract():
def test_mediaTweetExtract():
tweet = twExtract.extractStatus(testMediaTweet)
tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMedia_compare['description']
assert tweet["user"]["screen_name"]==twitterAccountName
assert 'extended_entities' in tweet
@ -94,7 +96,7 @@ def test_mediaTweetExtract():
def test_multimediaTweetExtract():
tweet = twExtract.extractStatus(testMultiMediaTweet)
tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"][:94]==testMultiMedia_compare['description'][:94]
assert tweet["user"]["screen_name"]==twitterAccountName
assert 'extended_entities' in tweet
@ -107,12 +109,12 @@ def test_multimediaTweetExtract():
assert video["type"]=="photo"
def test_pollTweetExtract():
tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960")
tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens)
assert 'card' in tweet
compareDict(testPoll_comparePoll,tweet['card'])
def test_NSFW_TweetExtract():
tweet = twExtract.extractStatus(testNSFWTweet) # For now just test that there's no error
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error
## VNF conversion test ##
def test_textTweetVNF():

View File

@ -1,9 +1,8 @@
import yt_dlp
from yt_dlp.extractor import twitter
import uuid
import json
import requests
import re
import os
import random
from . import twExtractError
import urllib.parse
@ -26,20 +25,20 @@ def getGuestToken():
guestToken = json.loads(r.text)["guest_token"]
return guestToken
def extractStatus_token(url):
def extractStatus_token(url,workaroundTokens):
global usedTokens
# get tweet ID
m = re.search(pathregex, url)
if m is None:
raise twExtractError.TwExtractError(400, "Extract error")
twid = m.group(2)
if config["config"]["workaroundTokens"] == None:
if workaroundTokens == None:
raise twExtractError.TwExtractError(400, "Extract error (no tokens defined)")
# get tweet
tokens = config["config"]["workaroundTokens"].split(",")
tokens = workaroundTokens
tokens = [i for i in tokens if i not in usedTokens]
if len(tokens) == 0:
tokens = config["config"]["workaroundTokens"].split(",")
tokens = workaroundTokens
usedTokens.clear()
random.shuffle(tokens)
for authToken in tokens:
@ -107,21 +106,21 @@ def extractStatus_syndication(url):
return output
def extractStatusV2(url):
def extractStatusV2(url,workaroundTokens):
global usedTokens
# get tweet ID
m = re.search(pathregex, url)
if m is None:
raise twExtractError.TwExtractError(400, "Extract error")
twid = m.group(2)
if config["config"]["workaroundTokens"] == None:
if workaroundTokens == None:
raise twExtractError.TwExtractError(400, "Extract error (no tokens defined)")
# get tweet
tokens = config["config"]["workaroundTokens"].split(",")
tokens = workaroundTokens
print("Number of tokens used: "+str(len(usedTokens)))
tokens = [i for i in tokens if i not in usedTokens]
if len(tokens) == 0:
tokens = config["config"]["workaroundTokens"].split(",")
tokens = workaroundTokens
usedTokens.clear()
random.shuffle(tokens)
for authToken in tokens:
@ -162,8 +161,8 @@ def extractStatusV2(url):
return tweet
raise twExtractError.TwExtractError(400, "Extract error")
def extractStatusV2Legacy(url):
tweet = extractStatusV2(url)
def extractStatusV2Legacy(url,workaroundTokens):
tweet = extractStatusV2(url,workaroundTokens)
if 'errors' in tweet or 'legacy' not in tweet:
if 'errors' in tweet:
raise twExtractError.TwExtractError(400, "Extract error: "+json.dumps(tweet['errors']))
@ -181,17 +180,17 @@ def extractStatusV2Legacy(url):
tweet['legacy']['card'] = tweet['tweet_card']['legacy']
return tweet['legacy']
def extractStatus(url):
def extractStatus(url,workaroundTokens=None):
methods=[extractStatus_syndication,extractStatusV2Legacy]
for method in methods:
try:
return method(url)
return method(url,workaroundTokens)
except Exception as e:
print(f"{method.__name__} method failed: {str(e)}")
continue
raise twExtractError.TwExtractError(400, "Extract error")
def extractUser(url):
def extractUser(url,workaroundTokens):
global usedTokens
useId=True
m = re.search(userIDregex, url)
@ -203,10 +202,10 @@ def extractUser(url):
useId=False
screen_name = m.group(1)
# get user
tokens = config["config"]["workaroundTokens"].split(",")
tokens = workaroundTokens
tokens = [i for i in tokens if i not in usedTokens]
if len(tokens) == 0:
tokens = config["config"]["workaroundTokens"].split(",")
tokens = workaroundTokens
usedTokens.clear()
random.shuffle(tokens)
for authToken in tokens:
@ -239,5 +238,5 @@ def lambda_handler(event, context):
url = event["queryStringParameters"].get("url","")
return {
'statusCode': 200,
'body': extractStatus(url)
'body': extractStatus(url,workaroundTokens=os.getenv("VXTWITTER_WORKAROUND_TOKENS",None).split(','))
}

View File

@ -152,7 +152,7 @@ def twitfix(sub_path):
elif request.url.startswith("https://api.vx"):
twitter_url = "https://twitter.com/" + sub_path
try:
tweet = twExtract.extractStatusV2(twitter_url)
tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
tweetL = tweet["legacy"]
userL = tweet["core"]["user_result"]["result"]["legacy"]
media=[]
@ -546,7 +546,7 @@ def link_to_vnf_from_tweet_data(tweet,video_link):
def link_to_vnf_from_unofficial_api(video_link):
tweet=None
log.info("Attempting to download tweet info: "+video_link)
tweet = twExtract.extractStatus(video_link)
tweet = twExtract.extractStatus(video_link,workaroundTokens=config['config']['workaroundTokens'].split(','))
log.success("Unofficial API Success")
if "extended_entities" not in tweet:
@ -555,7 +555,7 @@ def link_to_vnf_from_unofficial_api(video_link):
for url in tweet["entities"]["urls"]:
if "/video/" in url["expanded_url"] or "/photo/" in url["expanded_url"]:
log.info("Extra tweet info found in entities: "+video_link+" -> "+url["expanded_url"])
subTweet = twExtract.extractStatus(url["expanded_url"])
subTweet = twExtract.extractStatus(url["expanded_url"],workaroundTokens=config['config']['workaroundTokens'].split(','))
if "extended_entities" in subTweet:
tweet["extended_entities"] = subTweet["extended_entities"]
break