From ff8bc4456cbf020ffd6c901e5f3b2221b8ab99ba Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 15 Aug 2023 10:09:04 +0100 Subject: [PATCH] Changed how twExtract recieves tokens --- test_vx.py | 22 ++++++++++++---------- twExtract/__init__.py | 35 +++++++++++++++++------------------ twitfix.py | 6 +++--- 3 files changed, 32 insertions(+), 31 deletions(-) diff --git a/test_vx.py b/test_vx.py index aaa9fc5..58bec41 100644 --- a/test_vx.py +++ b/test_vx.py @@ -32,6 +32,8 @@ testMultiMedia_compare={'tweet': 'https://twitter.com/Twitter/status/11541723245 testPoll_comparePoll={"name":"poll2choice_text_only","binding_values":{"choice1_label":{"type":"STRING","string_value":"Mean one thing"},"choice2_label":{"type":"STRING","string_value":"Mean multiple things"},"end_datetime_utc":{"type":"STRING","string_value":"2015-10-06T22:57:24Z"},"counts_are_final":{"type":"BOOLEAN","boolean_value":True},"choice2_count":{"type":"STRING","string_value":"33554"},"choice1_count":{"type":"STRING","string_value":"124875"},"last_updated_datetime_utc":{"type":"STRING","string_value":"2015-10-06T22:57:31Z"},"duration_minutes":{"type":"STRING","string_value":"1440"}}} testPoll_comparePollVNF={'total_votes': 158429, 'choices': [{'text': 'Mean one thing', 'votes': 124875, 'percent': 78.8}, {'text': 'Mean multiple things', 'votes': 33554, 'percent': 21.2}]} +tokens=os.getenv("VXTWITTER_WORKAROUND_TOKENS",None).split(',') + def compareDict(original,compare): for key in original: assert key in compare @@ -44,35 +46,35 @@ def compareDict(original,compare): ## Tweet retrieve tests ## def test_textTweetExtract(): - tweet = twExtract.extractStatus(testTextTweet) + tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens) assert tweet["full_text"]==textVNF_compare['description'] assert tweet["user"]["screen_name"]=="jack" assert 'extended_entities' not in tweet def test_extractV2(): # remove this when v2 is default - tweet = twExtract.extractStatusV2(testTextTweet) + tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens) def test_UserExtract(): - user = twExtract.extractUser(testUser) + user = twExtract.extractUser(testUser,workaroundTokens=tokens) assert user["screen_name"]=="jack" assert user["id"]==12 assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" def test_UserExtractID(): - user = twExtract.extractUser(testUserID) + user = twExtract.extractUser(testUserID,workaroundTokens=tokens) assert user["screen_name"]=="jack" assert user["id"]==12 assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" def test_UserExtractWeirdURLs(): for url in testUserWeirdURLs: - user = twExtract.extractUser(url) + user = twExtract.extractUser(url,workaroundTokens=tokens) assert user["screen_name"]=="jack" assert user["id"]==12 assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" def test_videoTweetExtract(): - tweet = twExtract.extractStatus(testVideoTweet) + tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens) assert tweet["full_text"]==videoVNF_compare['description'] assert tweet["user"]["screen_name"]==twitterAccountName assert 'extended_entities' in tweet @@ -83,7 +85,7 @@ def test_videoTweetExtract(): def test_mediaTweetExtract(): - tweet = twExtract.extractStatus(testMediaTweet) + tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens) assert tweet["full_text"]==testMedia_compare['description'] assert tweet["user"]["screen_name"]==twitterAccountName assert 'extended_entities' in tweet @@ -94,7 +96,7 @@ def test_mediaTweetExtract(): def test_multimediaTweetExtract(): - tweet = twExtract.extractStatus(testMultiMediaTweet) + tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens) assert tweet["full_text"][:94]==testMultiMedia_compare['description'][:94] assert tweet["user"]["screen_name"]==twitterAccountName assert 'extended_entities' in tweet @@ -107,12 +109,12 @@ def test_multimediaTweetExtract(): assert video["type"]=="photo" def test_pollTweetExtract(): - tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960") + tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens) assert 'card' in tweet compareDict(testPoll_comparePoll,tweet['card']) def test_NSFW_TweetExtract(): - tweet = twExtract.extractStatus(testNSFWTweet) # For now just test that there's no error + tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error ## VNF conversion test ## def test_textTweetVNF(): diff --git a/twExtract/__init__.py b/twExtract/__init__.py index a2ecc17..9629e88 100644 --- a/twExtract/__init__.py +++ b/twExtract/__init__.py @@ -1,9 +1,8 @@ -import yt_dlp -from yt_dlp.extractor import twitter import uuid import json import requests import re +import os import random from . import twExtractError import urllib.parse @@ -26,20 +25,20 @@ def getGuestToken(): guestToken = json.loads(r.text)["guest_token"] return guestToken -def extractStatus_token(url): +def extractStatus_token(url,workaroundTokens): global usedTokens # get tweet ID m = re.search(pathregex, url) if m is None: raise twExtractError.TwExtractError(400, "Extract error") twid = m.group(2) - if config["config"]["workaroundTokens"] == None: + if workaroundTokens == None: raise twExtractError.TwExtractError(400, "Extract error (no tokens defined)") # get tweet - tokens = config["config"]["workaroundTokens"].split(",") + tokens = workaroundTokens tokens = [i for i in tokens if i not in usedTokens] if len(tokens) == 0: - tokens = config["config"]["workaroundTokens"].split(",") + tokens = workaroundTokens usedTokens.clear() random.shuffle(tokens) for authToken in tokens: @@ -107,21 +106,21 @@ def extractStatus_syndication(url): return output -def extractStatusV2(url): +def extractStatusV2(url,workaroundTokens): global usedTokens # get tweet ID m = re.search(pathregex, url) if m is None: raise twExtractError.TwExtractError(400, "Extract error") twid = m.group(2) - if config["config"]["workaroundTokens"] == None: + if workaroundTokens == None: raise twExtractError.TwExtractError(400, "Extract error (no tokens defined)") # get tweet - tokens = config["config"]["workaroundTokens"].split(",") + tokens = workaroundTokens print("Number of tokens used: "+str(len(usedTokens))) tokens = [i for i in tokens if i not in usedTokens] if len(tokens) == 0: - tokens = config["config"]["workaroundTokens"].split(",") + tokens = workaroundTokens usedTokens.clear() random.shuffle(tokens) for authToken in tokens: @@ -162,8 +161,8 @@ def extractStatusV2(url): return tweet raise twExtractError.TwExtractError(400, "Extract error") -def extractStatusV2Legacy(url): - tweet = extractStatusV2(url) +def extractStatusV2Legacy(url,workaroundTokens): + tweet = extractStatusV2(url,workaroundTokens) if 'errors' in tweet or 'legacy' not in tweet: if 'errors' in tweet: raise twExtractError.TwExtractError(400, "Extract error: "+json.dumps(tweet['errors'])) @@ -181,17 +180,17 @@ def extractStatusV2Legacy(url): tweet['legacy']['card'] = tweet['tweet_card']['legacy'] return tweet['legacy'] -def extractStatus(url): +def extractStatus(url,workaroundTokens=None): methods=[extractStatus_syndication,extractStatusV2Legacy] for method in methods: try: - return method(url) + return method(url,workaroundTokens) except Exception as e: print(f"{method.__name__} method failed: {str(e)}") continue raise twExtractError.TwExtractError(400, "Extract error") -def extractUser(url): +def extractUser(url,workaroundTokens): global usedTokens useId=True m = re.search(userIDregex, url) @@ -203,10 +202,10 @@ def extractUser(url): useId=False screen_name = m.group(1) # get user - tokens = config["config"]["workaroundTokens"].split(",") + tokens = workaroundTokens tokens = [i for i in tokens if i not in usedTokens] if len(tokens) == 0: - tokens = config["config"]["workaroundTokens"].split(",") + tokens = workaroundTokens usedTokens.clear() random.shuffle(tokens) for authToken in tokens: @@ -239,5 +238,5 @@ def lambda_handler(event, context): url = event["queryStringParameters"].get("url","") return { 'statusCode': 200, - 'body': extractStatus(url) + 'body': extractStatus(url,workaroundTokens=os.getenv("VXTWITTER_WORKAROUND_TOKENS",None).split(',')) } \ No newline at end of file diff --git a/twitfix.py b/twitfix.py index 17b03dd..10c9593 100644 --- a/twitfix.py +++ b/twitfix.py @@ -152,7 +152,7 @@ def twitfix(sub_path): elif request.url.startswith("https://api.vx"): twitter_url = "https://twitter.com/" + sub_path try: - tweet = twExtract.extractStatusV2(twitter_url) + tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(',')) tweetL = tweet["legacy"] userL = tweet["core"]["user_result"]["result"]["legacy"] media=[] @@ -546,7 +546,7 @@ def link_to_vnf_from_tweet_data(tweet,video_link): def link_to_vnf_from_unofficial_api(video_link): tweet=None log.info("Attempting to download tweet info: "+video_link) - tweet = twExtract.extractStatus(video_link) + tweet = twExtract.extractStatus(video_link,workaroundTokens=config['config']['workaroundTokens'].split(',')) log.success("Unofficial API Success") if "extended_entities" not in tweet: @@ -555,7 +555,7 @@ def link_to_vnf_from_unofficial_api(video_link): for url in tweet["entities"]["urls"]: if "/video/" in url["expanded_url"] or "/photo/" in url["expanded_url"]: log.info("Extra tweet info found in entities: "+video_link+" -> "+url["expanded_url"]) - subTweet = twExtract.extractStatus(url["expanded_url"]) + subTweet = twExtract.extractStatus(url["expanded_url"],workaroundTokens=config['config']['workaroundTokens'].split(',')) if "extended_entities" in subTweet: tweet["extended_entities"] = subTweet["extended_entities"] break