chore: update my branch #1
| @@ -38,6 +38,8 @@ def tweetDataToActivity(tweetData,embedIndex = -1): | ||||
|             media['type'] = "gifv" | ||||
|         if 'thumbnail_url' not in media: | ||||
|             media['thumbnail_url'] = media['url'] | ||||
|         if media['type'] == "image" and "?" not in media['url']: | ||||
|             media['url'] += "?name=orig" | ||||
|         attachments.append({ | ||||
|             "id": "114163769487684704", | ||||
|             "type": media['type'], | ||||
|   | ||||
| @@ -7,4 +7,5 @@ Flask-Cors==4.0.0 | ||||
| Werkzeug==2.3.7 | ||||
| numerize==0.12 | ||||
| oauthlib==3.2.2 | ||||
| PyRTF3==0.47.5 | ||||
| PyRTF3==0.47.5 | ||||
| XClientTransaction==0.0.2 | ||||
| @@ -2,7 +2,7 @@ | ||||
| <meta content="{{ color }}" name="theme-color" /> | ||||
| <meta property="og:site_name" content="{{ appname }}"> | ||||
|  | ||||
| <meta name="twitter:title" content="{{ tweet['user_name'] }} (@{{ tweet['user_screen_name'] }})" /> | ||||
| <meta name="og:title" content="{{ tweet['user_name'] }} (@{{ tweet['user_screen_name'] }})" /> | ||||
|  | ||||
| {% if activityLink %} | ||||
| <link type="application/activity+json" href="{{ activityLink|safe }}" /> | ||||
|   | ||||
| @@ -8,7 +8,7 @@ | ||||
| <meta content="{{ color }}" name="theme-color" /> | ||||
| <meta property="og:site_name" content="{{ appname }}"> | ||||
|  | ||||
| <meta name="twitter:title" content="{{ user['name'] }} (@{{ user['screen_name'] }})" /> | ||||
| <meta name="og:title" content="{{ user['name'] }} (@{{ user['screen_name'] }})" /> | ||||
|  | ||||
| <meta name="twitter:image" content="{{ user['profile_image_url'] }}" /> | ||||
| <meta name="twitter:creator" content="@{{ user['name'] }}" /> | ||||
|   | ||||
							
								
								
									
										46
									
								
								test_api.py
									
									
									
									
									
								
							
							
						
						
									
										46
									
								
								test_api.py
									
									
									
									
									
								
							| @@ -3,6 +3,17 @@ import twitfix,twExtract | ||||
| from flask.testing import FlaskClient | ||||
| client = FlaskClient(twitfix.app) | ||||
|  | ||||
| def test_api_get_tweet(): | ||||
|     resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=true",headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert resp.status_code==200 | ||||
|     assert jData['text'] == 'just setting up my twttr' | ||||
|  | ||||
| def test_api_get_invalid_tweet(): | ||||
|     resp = client.get("https://vxtwitter.com/test/status/None",headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert resp.status_code!=200 | ||||
|  | ||||
| def test_api_include_txt(): | ||||
|     resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=true",headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
| @@ -38,7 +49,38 @@ def test_api_include_rtf_nomedia(): | ||||
|     assert not any(".rtf" in i for i in jData["mediaURLs"]) | ||||
|  | ||||
| def test_api_user(): | ||||
|     resp = client.get(testUser.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=true",headers={"User-Agent":"test"}) | ||||
|     resp = client.get(testUser.replace("https://twitter.com","https://api.vxtwitter.com"),headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert resp.status_code==200 | ||||
|     assert jData["screen_name"]=="jack" | ||||
|     assert jData["screen_name"]=="jack" | ||||
|  | ||||
| def test_api_user_suspended(): | ||||
|     resp = client.get(testUserSuspended.replace("https://twitter.com","https://api.vxtwitter.com"),headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert resp.status_code==500 | ||||
|     assert 'suspended' in jData["error"] | ||||
|  | ||||
| def test_api_user_private(): | ||||
|     resp = client.get(testUserPrivate.replace("https://twitter.com","https://api.vxtwitter.com")+"?with_tweets=true",headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert jData['protected'] == True | ||||
|     assert len(jData["latest_tweets"])==0 | ||||
|  | ||||
| def test_api_user_invalid(): | ||||
|     resp = client.get(testUserInvalid.replace("https://twitter.com","https://api.vxtwitter.com")+"?with_tweets=true",headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert resp.status_code==404 | ||||
|  | ||||
| def test_api_user_feed(): | ||||
|     resp = client.get(testUser.replace("https://twitter.com","https://api.vxtwitter.com")+"?with_tweets=true",headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert resp.status_code==200 | ||||
|     assert jData["screen_name"]=="jack" | ||||
|     assert len(jData["latest_tweets"])>0 | ||||
|  | ||||
| def test_api_retweet(): | ||||
|     resp = client.get(testRetweetTweet.replace("https://twitter.com","https://api.vxtwitter.com"),headers={"User-Agent":"test"}) | ||||
|     jData = resp.get_json() | ||||
|     assert jData['retweetURL'] == 'https://twitter.com/i/status/1828569456231993456' | ||||
|     assert jData['retweet'] != None | ||||
|     assert jData['retweet']['text'].startswith("If you want to try") | ||||
| @@ -101,7 +101,6 @@ def test_twextract_pollTweetExtract(): # basic check if poll data exists | ||||
| def test_twextract_NSFW_TweetExtract(): | ||||
|     tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error | ||||
|  | ||||
| ''' | ||||
| def test_twextract_feed(): | ||||
|     tweet = twExtract.extractUserFeedFromId(testUserID,workaroundTokens=tokens) | ||||
| ''' | ||||
|     tweets = twExtract.extractUserFeedFromId(testUserID,workaroundTokens=tokens) # For now just test that there's no error | ||||
|     assert len(tweets)>0 | ||||
| @@ -12,6 +12,7 @@ tests = { | ||||
|     "testPollTweet": "https://twitter.com/norm/status/651169346518056960", | ||||
|     "testMixedMediaTweet":"https://twitter.com/bigbeerfest/status/1760638922084741177", | ||||
|     "testVinePlayerTweet":"https://twitter.com/Roblox/status/583302104342638592", | ||||
|     "testRetweetTweet":"https://twitter.com/pdxdylan/status/1828570470222045294", | ||||
| } | ||||
|  | ||||
| def getVNFFromLink(link): | ||||
|   | ||||
| @@ -14,6 +14,8 @@ v2bearer="Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1 | ||||
| androidBearer="Bearer AAAAAAAAAAAAAAAAAAAAAFXzAwAAAAAAMHCxpeSDG1gLNLghVe8d74hl6k4%3DRUMF4xAQLsbeBhTSRrCiQpJtxoGWeyHrDb5te2jpGskWDFW82F" | ||||
| tweetdeckBearer="Bearer AAAAAAAAAAAAAAAAAAAAAFQODgEAAAAAVHTp76lzh3rFzcHbmHVvQxYYpTw%3DckAlMINMjmCwxUcaXbAN4XqJVdgMJaHqNOFgPMK0zN1qLqLQCF" | ||||
|  | ||||
| requestUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0" | ||||
|  | ||||
| bearerTokens=[tweetdeckBearer,bearer,v2bearer,androidBearer] | ||||
|  | ||||
| guestToken=None | ||||
| @@ -36,8 +38,8 @@ tweetDetailGraphqlFeatures='{"rweb_tipjar_consumption_enabled":true,"responsive_ | ||||
| tweetDetailGraphql_api="e7RKseIxLu7HgkWNKZ6qnw" | ||||
|  | ||||
| # this is for UserTweets endpoint | ||||
| tweetFeedGraphqlFeatures='{"profile_label_improvements_pcf_label_in_post_enabled":true,"rweb_tipjar_consumption_enabled":true,"responsive_web_graphql_exclude_directive_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":true,"responsive_web_jetfuel_frame":false,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"rweb_video_timestamps_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_enhance_cards_enabled":false}' | ||||
| tweetFeedGraphql_api="Y9WM4Id6UcGFE8Z-hbnixw" | ||||
| tweetFeedGraphqlFeatures='{"rweb_video_screen_enabled":false,"profile_label_improvements_pcf_label_in_post_enabled":true,"rweb_tipjar_consumption_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":false,"responsive_web_jetfuel_frame":false,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_show_grok_translated_post":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_enhance_cards_enabled":false}' | ||||
| tweetFeedGraphql_api="Li2XXGESVev94TzFtntrgA" | ||||
|  | ||||
| twitterUrl = "x.com" # doubt this will change but just in case | ||||
| class TwExtractError(Exception): | ||||
| @@ -99,7 +101,7 @@ def twitterApiGet(url,btoken=None,authToken=None,guestToken=None): | ||||
|  | ||||
| def getAuthHeaders(btoken,authToken=None,guestToken=None): | ||||
|     csrfToken=str(uuid.uuid4()).replace('-', '') | ||||
|     headers = {"x-twitter-active-user":"yes","x-twitter-client-language":"en","x-csrf-token":csrfToken,"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0"} | ||||
|     headers = {"x-twitter-active-user":"yes","x-twitter-client-language":"en","x-csrf-token":csrfToken,"User-Agent":requestUserAgent} | ||||
|     headers['Authorization'] = btoken | ||||
|  | ||||
|     if authToken is not None: | ||||
| @@ -107,6 +109,7 @@ def getAuthHeaders(btoken,authToken=None,guestToken=None): | ||||
|         headers["x-twitter-auth-type"] = "OAuth2Session" | ||||
|     if guestToken is not None: | ||||
|         headers["x-guest-token"] = guestToken | ||||
|         headers["Cookie"] = f"gt={guestToken}; ct0={csrfToken}; guest_id=v1:174804309415864668;" | ||||
|  | ||||
|     return headers | ||||
|  | ||||
| @@ -114,7 +117,7 @@ def getGuestToken(): | ||||
|     global guestToken | ||||
|     global guestTokenUses | ||||
|     if guestToken is None: | ||||
|         r = requests.get(f"https://{twitterUrl}",headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0","Cookie":"night_mode=2"},allow_redirects=False) | ||||
|         r = requests.get(f"https://{twitterUrl}",headers={"User-Agent":requestUserAgent,"Cookie":"night_mode=2"},allow_redirects=False) | ||||
|         m = re.search(gt_pattern, r.text) | ||||
|         if m is None: | ||||
|             r = requests.post(f"https://api.{twitterUrl}/1.1/guest/activate.json", headers={"Authorization":bearer}) | ||||
| @@ -313,7 +316,11 @@ def extractStatusV2Android(url,workaroundTokens): | ||||
|                 print(f"Error in output: {json.dumps(output['errors'])}") | ||||
|                 # try another token | ||||
|                 continue | ||||
|             entries=output['data']['timeline_response']['instructions'][0]['entries'] | ||||
|             entries = None | ||||
|             for instruction in output['data']['timeline_response']['instructions']: | ||||
|                 if instruction["__typename"] == "TimelineAddEntries": | ||||
|                     entries = instruction['entries'] | ||||
|                     break | ||||
|             tweetEntry=None | ||||
|             for entry in entries: | ||||
|                 if 'content' not in entry: | ||||
| @@ -370,7 +377,11 @@ def extractStatusV2TweetDetail(url,workaroundTokens): | ||||
|                 print(f"Error in output: {json.dumps(output['errors'])}") | ||||
|                 # try another token | ||||
|                 continue | ||||
|             entries=output['data']['threaded_conversation_with_injections_v2']['instructions'][0]['entries'] | ||||
|             entries = None | ||||
|             for instruction in output['data']['threaded_conversation_with_injections_v2']['instructions']: | ||||
|                 if instruction["type"] == "TimelineAddEntries": | ||||
|                     entries = instruction['entries'] | ||||
|                     break | ||||
|             tweetEntry=None | ||||
|             for entry in entries: | ||||
|                 if 'content' not in entry: | ||||
| @@ -499,6 +510,8 @@ def extractUser(url,workaroundTokens): | ||||
|                 raise TwExtractError(error["code"], error["message"]) | ||||
|             return output | ||||
|         except Exception as e: | ||||
|             if hasattr(e,"msg") and (e.msg == 'User has been suspended.' or e.msg == 'User not found.'): | ||||
|                 raise e | ||||
|             continue | ||||
|     raise TwExtractError(400, "Extract error") | ||||
|  | ||||
| @@ -510,25 +523,41 @@ def extractUserFeedFromId(userId,workaroundTokens): | ||||
|             # TODO: https://api.twitter.com/graphql/x31u1gdnjcqtiVZFc1zWnQ/UserWithProfileTweetsQueryV2?variables={"cursor":"?","includeTweetImpression":true,"includeHasBirdwatchNotes":false,"includeEditPerspective":false,"includeEditControl":true,"count":40,"rest_id":"12","includeTweetVisibilityNudge":true,"autoplay_enabled":true}&features={"longform_notetweets_inline_media_enabled":true,"super_follow_badge_privacy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"super_follow_user_api_enabled":true,"unified_cards_ad_metadata_container_dynamic_card_content_query_enabled":true,"super_follow_tweet_api_enabled":true,"articles_api_enabled":true,"android_graphql_skip_api_media_color_palette":true,"creator_subscriptions_tweet_preview_api_enabled":true,"freedom_of_speech_not_reach_fetch_enabled":true,"tweetypie_unmention_optimization_enabled":true,"longform_notetweets_consumption_enabled":true,"subscriptions_verification_info_enabled":true,"blue_business_profile_image_shape_enabled":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"immersive_video_status_linkable_timestamps":false,"super_follow_exclusive_tweet_notifications_enabled":true} | ||||
|             continue | ||||
|         try: | ||||
|             vars = json.loads('{"userId":"x","count":20,"includePromotedContent":true,"withQuickPromoteEligibilityTweetFields":true,"withVoice":true,"withV2Timeline":true}') | ||||
|             vars = json.loads('{"userId":"0","count":20,"includePromotedContent":true,"withQuickPromoteEligibilityTweetFields":true,"withVoice":true}') | ||||
|             vars['userId'] = str(userId) | ||||
|             vars['includePromotedContent'] = False # idk if this works | ||||
|             reqHeaders = getAuthHeaders(bearer,authToken=authToken) | ||||
|             reqHeaders["x-client-transaction-id"] = twUtils.generate_transaction_id("GET","/i/api/graphql/x31u1gdnjcqtiVZFc1zWnQ/UserWithProfileTweetsQueryV2") | ||||
|             feed = requests.get(f"https://{twitterUrl}/i/api/graphql/{tweetFeedGraphql_api}/UserTweets?variables={urllib.parse.quote(json.dumps(vars))}&features={urllib.parse.quote(tweetFeedGraphqlFeatures)}", reqHeaders) | ||||
|             reqHeaders = getAuthHeaders(v2bearer,guestToken=getGuestToken()) | ||||
|             endpoint=f"/i/api/graphql/{tweetFeedGraphql_api}/UserTweets" | ||||
|             reqHeaders["x-client-transaction-id"] = twUtils.generate_transaction_id("GET",endpoint) | ||||
|             feed = requests.get(f"https://{twitterUrl}{endpoint}", {'variables':json.dumps(vars),'features':tweetFeedGraphqlFeatures,'fieldToggles':'{"withArticlePlainText":false}'},headers=reqHeaders) | ||||
|             if feed.status_code == 403: | ||||
|                 raise TwExtractError(403, "Extract error") | ||||
|             output = feed.json() | ||||
|             if "errors" in output: | ||||
|                 # pick the first error and create a twExtractError | ||||
|                 error = output["errors"][0] | ||||
|                 raise TwExtractError(error["code"], error["message"]) | ||||
|             return output | ||||
|             timelineInstructions = output['data']['user']['result']['timeline']['timeline']['instructions'] | ||||
|             #tweetIds=None | ||||
|             tweets=None | ||||
|             for instruction in timelineInstructions: | ||||
|                 if 'type' in instruction and instruction['type'] == 'TimelineAddEntries': | ||||
|                     entries = instruction['entries'] | ||||
|                     #tweetIds = [] | ||||
|                     tweets = [] | ||||
|                     for entry in entries: | ||||
|                         if entry['entryId'].startswith("tweet-"): | ||||
|                             # get the tweet ID from the entryId | ||||
|                             #tweetId = entry['entryId'].split("-")[1] | ||||
|                             #tweetIds.append(tweetId) | ||||
|                             tweet = entry['content']['itemContent']['tweet_results']['result'] | ||||
|                             tweets.append(tweet) | ||||
|             return tweets | ||||
|         except Exception as e: | ||||
|             print(f"Exception in extractUserFeedFromId: {str(e)}") | ||||
|             continue | ||||
|     raise TwExtractError(400, "Extract error") | ||||
|  | ||||
| def extractUserFeed(username,workaroundTokens): | ||||
|     pass | ||||
|  | ||||
| def lambda_handler(event, context): | ||||
|     if ("queryStringParameters" not in event): | ||||
|         return { | ||||
|   | ||||
| @@ -2,6 +2,9 @@ import math | ||||
| import hashlib | ||||
| import base64 | ||||
| import uuid | ||||
| from x_client_transaction import ClientTransaction | ||||
| from x_client_transaction.utils import handle_x_migration | ||||
| import requests | ||||
| digits = "0123456789abcdefghijklmnopqrstuvwxyz" | ||||
|  | ||||
| def baseConversion(x, base): | ||||
| @@ -31,5 +34,21 @@ def calcSyndicationToken(idStr): | ||||
|         c = '0' | ||||
|     return c | ||||
|  | ||||
| def generate_transaction_id(method: str, path: str) -> str: | ||||
|     return "?" # not implemented | ||||
| def get_twitter_homepage(headers=None): | ||||
|     if headers is None: | ||||
|         headers = {"Authority": "x.com", | ||||
|             "Accept-Language": "en-US,en;q=0.9", | ||||
|             "Cache-Control": "no-cache", | ||||
|             "Referer": "https://x.com", | ||||
|             "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36", | ||||
|             "X-Twitter-Active-User": "yes", | ||||
|             "X-Twitter-Client-Language": "en"} | ||||
|     if 'Authorization' in headers: | ||||
|         del headers['Authorization'] | ||||
|     response = requests.get("https://x.com/home", headers=headers) | ||||
|     return response | ||||
|  | ||||
| def generate_transaction_id(method: str, path: str,headers=None) -> str: | ||||
|     ct = ClientTransaction(get_twitter_homepage(headers=headers)) | ||||
|     transaction_id = ct.generate_transaction_id(method=method, path=path) | ||||
|     return transaction_id | ||||
							
								
								
									
										35
									
								
								twitfix.py
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								twitfix.py
									
									
									
									
									
								
							| @@ -290,9 +290,20 @@ def getTweetData(twitter_url,include_txt="false",include_rtf="false"): | ||||
|         addVnfToLinkCache(twitter_url,tweetData) | ||||
|     return tweetData | ||||
|  | ||||
| def getUserData(twitter_url): | ||||
| def getUserData(twitter_url,includeFeed=False): | ||||
|     rawUserData = twExtract.extractUser(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(',')) | ||||
|     userData = getApiUserResponse(rawUserData) | ||||
|  | ||||
|     if includeFeed: | ||||
|         if userData['protected']: | ||||
|             userData['latest_tweets']=[] | ||||
|         else: | ||||
|             feed = twExtract.extractUserFeedFromId(userData['id'],workaroundTokens=config['config']['workaroundTokens'].split(',')) | ||||
|             apiFeed = [] | ||||
|             for tweet in feed: | ||||
|                 apiFeed.append(getApiResponse(tweet)) | ||||
|             userData['latest_tweets'] = apiFeed | ||||
|  | ||||
|     return userData | ||||
|  | ||||
| @app.route('/<path:sub_path>') # Default endpoint used by everything | ||||
| @@ -322,7 +333,15 @@ def twitfix(sub_path): | ||||
|             username=sub_path.split("/")[0] | ||||
|             extra = sub_path.split("/")[1] | ||||
|         if extra in [None,"with_replies","media","likes","highlights","superfollows","media",''] and username != "" and username != None: | ||||
|             userData = getUserData(f"https://twitter.com/{username}") | ||||
|             try: | ||||
|                 userData = getUserData(f"https://twitter.com/{username}","with_tweets" in request.args) | ||||
|             except twExtract.TwExtractError as e: | ||||
|                 if isApiRequest: | ||||
|                     status=500 | ||||
|                     if 'not found' in e.msg: | ||||
|                         status=404 | ||||
|                     return Response(json.dumps({"error": e.msg}), status=status,mimetype='application/json') | ||||
|                 return message("Error getting user data: "+str(e.msg)) | ||||
|             if isApiRequest: | ||||
|                 if userData is None: | ||||
|                     abort(404) | ||||
| @@ -351,6 +370,12 @@ def twitfix(sub_path): | ||||
|     if 'qrtURL' in tweetData and tweetData['qrtURL'] is not None: | ||||
|         qrt = getTweetData(tweetData['qrtURL']) | ||||
|     tweetData['qrt'] = qrt | ||||
|  | ||||
|     retweet = None | ||||
|     if 'retweetURL' in tweetData and tweetData['retweetURL'] is not None: | ||||
|         retweet = getTweetData(tweetData['retweetURL']) | ||||
|     tweetData['retweet'] = retweet | ||||
|  | ||||
|     tweetData = deepcopy(tweetData) | ||||
|     log.success("Tweet Data Get success") | ||||
|     if '?' in request.url: | ||||
| @@ -465,6 +490,12 @@ def api_v1_status(tweet_id): | ||||
|     if 'qrtURL' in tweetData and tweetData['qrtURL'] is not None: | ||||
|         qrt = getTweetData(tweetData['qrtURL']) | ||||
|     tweetData['qrt'] = qrt | ||||
|  | ||||
|     retweet = None | ||||
|     if 'retweetURL' in tweetData and tweetData['retweetURL'] is not None: | ||||
|         retweet = getTweetData(tweetData['retweetURL']) | ||||
|     tweetData['retweet'] = retweet | ||||
|  | ||||
|     if tweetData is None: | ||||
|         abort(500) # this should cause Discord to fall back to the default embed | ||||
|     return activitymg.tweetDataToActivity(tweetData,embedIndex) | ||||
|   | ||||
							
								
								
									
										28
									
								
								vxApi.py
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								vxApi.py
									
									
									
									
									
								
							| @@ -16,6 +16,7 @@ def getApiUserResponse(user): | ||||
|         "tweet_count": user["statuses_count"], | ||||
|         "created_at": user["created_at"], | ||||
|         "protected": user["protected"], | ||||
|         "fetched_on": int(datetime.now().timestamp()), | ||||
|     } | ||||
|  | ||||
| def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
| @@ -98,7 +99,7 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
|         if "hashtags" in tweetL["entities"]: | ||||
|             for i in tweetL["entities"]["hashtags"]: | ||||
|                 hashtags.append(i["text"]) | ||||
|     elif "card" in tweet and tweet['card']['name'] == "player": | ||||
|     elif "card" in tweet and 'name' in tweet['card'] and tweet['card']['name'] == "player": | ||||
|         width = None | ||||
|         height = None | ||||
|         vidUrl = None | ||||
| @@ -143,6 +144,10 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
|     if 'quoted_status_id_str' in tweetL: | ||||
|         qrtURL = "https://twitter.com/i/status/" + tweetL['quoted_status_id_str'] | ||||
|  | ||||
|     retweetURL = None | ||||
|     if 'retweeted_status_result' in tweetL: | ||||
|         retweetURL = "https://twitter.com/i/status/" + tweetL['retweeted_status_result']['result']['rest_id'] | ||||
|  | ||||
|     if 'possibly_sensitive' not in tweetL: | ||||
|         tweetL['possibly_sensitive'] = False | ||||
|  | ||||
| @@ -155,6 +160,8 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
|  | ||||
|     if 'entities' in tweetL and 'urls' in tweetL['entities']: | ||||
|         for eurl in tweetL['entities']['urls']: | ||||
|             if 'expanded_url' not in eurl: | ||||
|                 continue | ||||
|             if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"): | ||||
|                 twText = twText.replace(eurl["url"], "") | ||||
|             else: | ||||
| @@ -210,7 +217,7 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
|                 totalVotes += option["votes"] | ||||
|                 pollData["options"].append(option) | ||||
|         for i in pollData["options"]: | ||||
|             i["percent"] = round((i["votes"]/totalVotes)*100,2) | ||||
|             i["percent"] = round((i["votes"]/totalVotes)*100,2) if totalVotes > 0 else 0 | ||||
|          | ||||
|     if 'lang' in tweetL: | ||||
|         lang = tweetL['lang'] | ||||
| @@ -219,6 +226,18 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
|     if 'in_reply_to_screen_name' in tweetL and tweetL['in_reply_to_screen_name'] != None: | ||||
|         replyingTo = tweetL['in_reply_to_screen_name'] | ||||
|  | ||||
|     replyingToID = None | ||||
|     if 'in_reply_to_status_id_str' in tweetL and tweetL['in_reply_to_status_id_str'] != None: | ||||
|         replyingToID = tweetL['in_reply_to_status_id_str'] | ||||
|  | ||||
|     if 'screen_name' not in userL and 'core' in tweet["core"]["user_results"]["result"]: | ||||
|         userL['screen_name'] = tweet["core"]["user_results"]["result"]["core"]["screen_name"] | ||||
|         if 'name' not in userL: | ||||
|             userL['name'] = tweet["core"]["user_results"]["result"]["core"]["name"] | ||||
|      | ||||
|     if 'profile_image_url_https' not in userL and 'avatar' in tweet["core"]["user_results"]["result"]: | ||||
|         userL['profile_image_url_https'] = tweet["core"]["user_results"]["result"]["avatar"]["image_url"] | ||||
|  | ||||
|     apiObject = { | ||||
|         "text": twText, | ||||
|         "likes": tweetL["favorite_count"], | ||||
| @@ -244,10 +263,13 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False): | ||||
|         "article": tweetArticle, | ||||
|         "lang": lang, | ||||
|         "replyingTo": replyingTo, | ||||
|         "replyingToID": replyingToID, | ||||
|         "fetched_on": int(datetime.now().timestamp()), | ||||
|         "retweetURL":retweetURL, | ||||
|     } | ||||
|     try: | ||||
|         apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp()) | ||||
|     except: | ||||
|         pass | ||||
|  | ||||
|     return apiObject | ||||
|     return apiObject | ||||
|   | ||||
| @@ -12,6 +12,7 @@ testNSFWTweet="https://twitter.com/kuyacoy/status/1581185279376838657" | ||||
| testPollTweet="https://twitter.com/norm/status/651169346518056960" | ||||
| testMixedMediaTweet="https://twitter.com/bigbeerfest/status/1760638922084741177" | ||||
| testVinePlayerTweet="https://twitter.com/Roblox/status/583302104342638592" | ||||
| testRetweetTweet="https://twitter.com/pdxdylan/status/1828570470222045294" | ||||
|  | ||||
| testTextTweet_compare={'text': 'just setting up my twttr', 'date': 'Tue Mar 21 20:50:14 +0000 2006', 'tweetURL': 'https://twitter.com/jack/status/20', 'tweetID': '20', 'conversationID': '20', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': None, 'article': None, 'date_epoch': 1142974214} | ||||
| testVideoTweet_compare={'text': 'TikTok embeds on Discord/Telegram bait you with a fake play button, but to see the actual video you have to go to their website.\nAs a request from a friend, I made it so that if you add "vx" before "tiktok" on any link, it fixes that. https://t.co/QYpiVXUIrW', 'date': 'Fri Jun 24 18:17:31 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1540398733669666818', 'tweetID': '1540398733669666818', 'conversationID': '1540398733669666818', 'mediaURLs': ['https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4'], 'media_extended': [{'url': 'https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4', 'type': 'video', 'size': {'width': 762, 'height': 528}, 'duration_millis': 13650, 'thumbnail_url': 'https://pbs.twimg.com/ext_tw_video_thumb/1540396699037929472/pu/img/l187Z6B9AHHxUKPV.jpg', 'altText': None}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': None, 'article': None, 'date_epoch': 1656094651} | ||||
| @@ -26,6 +27,9 @@ testMixedMediaTweet_compare={'text': 'Some of us here are definitely big nerds a | ||||
| testVinePlayerTweet_compare={'text': 'You wanted old ROBLOX back, you got it. Check out our sweet "new" look! #BringBackOldROBLOX https://vine.co/v/OL9VqvM6wJh', 'date': 'Wed Apr 01 16:17:13 +0000 2015', 'tweetURL': 'https://twitter.com/Roblox/status/583302104342638592', 'tweetID': '583302104342638592', 'conversationID': '583302104342638592', 'mediaURLs': ['https://v.cdn.vine.co/r/videos/20A1BE53011195086166081318912_3fe3b526b1a.1.5.3156516531034157495.mp4?versionId=DI1mMu7EI6zcLbvgucyp3GHebdz8.9cQ'], 'media_extended': [{'url': 'https://v.cdn.vine.co/r/videos/20A1BE53011195086166081318912_3fe3b526b1a.1.5.3156516531034157495.mp4?versionId=DI1mMu7EI6zcLbvgucyp3GHebdz8.9cQ', 'type': 'video', 'size': {'width': 435, 'height': 435}}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': {'options': []}, 'article': None, 'date_epoch': 1427905033} | ||||
|  | ||||
| testUser="https://twitter.com/jack" | ||||
| testUserSuspended="https://twitter.com/twitter" | ||||
| testUserPrivate="https://twitter.com/PrestigeIsKey" | ||||
| testUserInvalid="https://twitter.com/.a" | ||||
| testUserID=12 # could also be 170824883 | ||||
| testUserIDUrl = "https://twitter.com/i/user/"+str(testUserID) | ||||
| testUserWeirdURLs=["https://twitter.com/jack?lang=en","https://twitter.com/jack/with_replies","https://twitter.com/jack/media","https://twitter.com/jack/likes","https://twitter.com/jack/with_replies?lang=en","https://twitter.com/jack/media?lang=en","https://twitter.com/jack/likes?lang=en","https://twitter.com/jack/"] | ||||
|   | ||||
		Reference in New Issue
	
	Block a user