From ec9cd3619c37c66f93371def5c219959d0825772 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 17 Apr 2024 19:44:45 +0100 Subject: [PATCH] Move API code to separate file --- twitfix.py | 123 +++-------------------------------------------------- vxApi.py | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 117 deletions(-) create mode 100644 vxApi.py diff --git a/twitfix.py b/twitfix.py index fa33c57..94e3785 100644 --- a/twitfix.py +++ b/twitfix.py @@ -1,7 +1,6 @@ from weakref import finalize from flask import Flask, render_template, request, redirect, abort, Response, send_from_directory, url_for, send_file, make_response, jsonify from flask_cors import CORS -import textwrap import re import os import urllib.parse @@ -16,7 +15,8 @@ from cache import addVnfToLinkCache,getVnfFromLinkCache from yt_dlp.utils import ExtractorError import vxlogging as log import zipfile -import html + +from vxApi import getApiResponse from urllib.parse import urlparse app = Flask(__name__) CORS(app) @@ -168,128 +168,17 @@ def twitfix(sub_path): tweet = None if tweet is None: tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(',')) + if tweet is None: + log.error("API Get failed: " + twitter_url + " (Tweet null)") + abort(500, '{"message": "Failed to extract tweet (Twitter API error)"}') if 'error' in tweet: response = make_response(jsonify(tweet), 500) response.headers['Content-Type'] = 'application/json' response.cache_control.max_age = 3600 response.cache_control.public = True return response - tweetL = tweet["legacy"] - if "user_result" in tweet["core"]: - userL = tweet["core"]["user_result"]["result"]["legacy"] - elif "user_results" in tweet["core"]: - userL = tweet["core"]["user_results"]["result"]["legacy"] - media=[] - media_extended=[] - hashtags=[] - communityNote=None - try: - if "birdwatch_pivot" in tweet: - communityNote=tweet["birdwatch_pivot"]["note"]["summary"]["text"] - except: - pass - if "extended_entities" in tweetL: - if "media" in tweetL["extended_entities"]: - tmedia=tweetL["extended_entities"]["media"] - for i in tmedia: - extendedInfo={} - if "video_info" in i: - # find the highest bitrate - best_bitrate = -1 - besturl="" - for j in i["video_info"]["variants"]: - if j['content_type'] == "video/mp4" and '/hevc/' not in j["url"] and j['bitrate'] > best_bitrate: - besturl = j['url'] - best_bitrate = j['bitrate'] - media.append(besturl) - extendedInfo["url"] = besturl - extendedInfo["type"] = "video" - if (i["type"] == "animated_gif"): - extendedInfo["type"] = "gif" - altText = None - extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]} - if "ext_alt_text" in i: - altText=i["ext_alt_text"] - if "duration_millis" in i["video_info"]: - extendedInfo["duration_millis"] = i["video_info"]["duration_millis"] - else: - extendedInfo["duration_millis"] = 0 - extendedInfo["thumbnail_url"] = i["media_url_https"] - extendedInfo["altText"] = altText - media_extended.append(extendedInfo) - else: - media.append(i["media_url_https"]) - extendedInfo["url"] = i["media_url_https"] - altText=None - if "ext_alt_text" in i: - altText=i["ext_alt_text"] - extendedInfo["altText"] = altText - extendedInfo["type"] = "image" - extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]} - extendedInfo["thumbnail_url"] = i["media_url_https"] - media_extended.append(extendedInfo) - - if "hashtags" in tweetL["entities"]: - for i in tweetL["entities"]["hashtags"]: - hashtags.append(i["text"]) - - include_txt = request.args.get("include_txt", "false") - include_zip = request.args.get("include_zip", "false") # for certain types of archival software (i.e Hydrus) - - if include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0): - txturl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".txt" - media.append(txturl) - media_extended.append({"url":txturl,"type":"txt"}) - if include_zip == "true" or (include_zip == "ifnomedia" and len(media)==0): - zipurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".zip" - media.append(zipurl) - media_extended.append({"url":zipurl,"type":"zip"}) - - qrtURL = None - if 'quoted_status_id_str' in tweetL: - qrtURL = "https://twitter.com/i/status/" + tweetL['quoted_status_id_str'] - - if 'possibly_sensitive' not in tweetL: - tweetL['possibly_sensitive'] = False - - twText = html.unescape(tweetL["full_text"]) - - if 'entities' in tweetL and 'urls' in tweetL['entities']: - for eurl in tweetL['entities']['urls']: - if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"): - twText = twText.replace(eurl["url"], "") - else: - twText = twText.replace(eurl["url"],eurl["expanded_url"]) - - apiObject = { - "text": twText, - "likes": tweetL["favorite_count"], - "retweets": tweetL["retweet_count"], - "replies": tweetL["reply_count"], - "date": tweetL["created_at"], - "user_screen_name": html.unescape(userL["screen_name"]), - "user_name": userL["name"], - "user_profile_image_url": userL["profile_image_url_https"], - "tweetURL": "https://twitter.com/"+userL["screen_name"]+"/status/"+tweet["rest_id"], - "tweetID": tweet["rest_id"], - "conversationID": tweetL["conversation_id_str"], - "mediaURLs": media, - "media_extended": media_extended, - "possibly_sensitive": tweetL["possibly_sensitive"], - "hashtags": hashtags, - "qrtURL": qrtURL, - "communityNote": communityNote - } - try: - apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp()) - except: - pass - - if tweet is None: - log.error("API Get failed: " + twitter_url + " (Tweet null)") - abort(500, '{"message": "Failed to extract tweet (Twitter API error)"}') log.success("API Get success") - return apiObject + return getApiResponse(tweet) except Exception as e: log.error("API Get failed: " + twitter_url + " " + log.get_exception_traceback_str(e)) abort(500, '{"message": "Failed to extract tweet (Processing error)"}') diff --git a/vxApi.py b/vxApi.py new file mode 100644 index 0000000..9e1bc97 --- /dev/null +++ b/vxApi.py @@ -0,0 +1,117 @@ +import html +from datetime import datetime +from configHandler import config + +def getApiResponse(tweet,include_txt=False,include_zip=False): + tweetL = tweet["legacy"] + if "user_result" in tweet["core"]: + userL = tweet["core"]["user_result"]["result"]["legacy"] + elif "user_results" in tweet["core"]: + userL = tweet["core"]["user_results"]["result"]["legacy"] + media=[] + media_extended=[] + hashtags=[] + communityNote=None + try: + if "birdwatch_pivot" in tweet: + communityNote=tweet["birdwatch_pivot"]["note"]["summary"]["text"] + except: + pass + if "extended_entities" in tweetL: + if "media" in tweetL["extended_entities"]: + tmedia=tweetL["extended_entities"]["media"] + for i in tmedia: + extendedInfo={} + if "video_info" in i: + # find the highest bitrate + best_bitrate = -1 + besturl="" + for j in i["video_info"]["variants"]: + if j['content_type'] == "video/mp4" and '/hevc/' not in j["url"] and j['bitrate'] > best_bitrate: + besturl = j['url'] + best_bitrate = j['bitrate'] + media.append(besturl) + extendedInfo["url"] = besturl + extendedInfo["type"] = "video" + if (i["type"] == "animated_gif"): + extendedInfo["type"] = "gif" + altText = None + extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]} + if "ext_alt_text" in i: + altText=i["ext_alt_text"] + if "duration_millis" in i["video_info"]: + extendedInfo["duration_millis"] = i["video_info"]["duration_millis"] + else: + extendedInfo["duration_millis"] = 0 + extendedInfo["thumbnail_url"] = i["media_url_https"] + extendedInfo["altText"] = altText + media_extended.append(extendedInfo) + else: + media.append(i["media_url_https"]) + extendedInfo["url"] = i["media_url_https"] + altText=None + if "ext_alt_text" in i: + altText=i["ext_alt_text"] + extendedInfo["altText"] = altText + extendedInfo["type"] = "image" + extendedInfo["size"] = {"width":i["original_info"]["width"],"height":i["original_info"]["height"]} + extendedInfo["thumbnail_url"] = i["media_url_https"] + media_extended.append(extendedInfo) + + if "hashtags" in tweetL["entities"]: + for i in tweetL["entities"]["hashtags"]: + hashtags.append(i["text"]) + + #include_txt = request.args.get("include_txt", "false") + #include_zip = request.args.get("include_zip", "false") # for certain types of archival software (i.e Hydrus) + + if include_txt == "true" or (include_txt == "ifnomedia" and len(media)==0): + txturl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".txt" + media.append(txturl) + media_extended.append({"url":txturl,"type":"txt"}) + if include_zip == "true" or (include_zip == "ifnomedia" and len(media)==0): + zipurl = config['config']['url']+"/"+userL["screen_name"]+"/status/"+tweet["rest_id"]+".zip" + media.append(zipurl) + media_extended.append({"url":zipurl,"type":"zip"}) + + qrtURL = None + if 'quoted_status_id_str' in tweetL: + qrtURL = "https://twitter.com/i/status/" + tweetL['quoted_status_id_str'] + + if 'possibly_sensitive' not in tweetL: + tweetL['possibly_sensitive'] = False + + twText = html.unescape(tweetL["full_text"]) + + if 'entities' in tweetL and 'urls' in tweetL['entities']: + for eurl in tweetL['entities']['urls']: + if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"): + twText = twText.replace(eurl["url"], "") + else: + twText = twText.replace(eurl["url"],eurl["expanded_url"]) + + apiObject = { + "text": twText, + "likes": tweetL["favorite_count"], + "retweets": tweetL["retweet_count"], + "replies": tweetL["reply_count"], + "date": tweetL["created_at"], + "user_screen_name": html.unescape(userL["screen_name"]), + "user_name": userL["name"], + "user_profile_image_url": userL["profile_image_url_https"], + "tweetURL": "https://twitter.com/"+userL["screen_name"]+"/status/"+tweet["rest_id"], + "tweetID": tweet["rest_id"], + "conversationID": tweetL["conversation_id_str"], + "mediaURLs": media, + "media_extended": media_extended, + "possibly_sensitive": tweetL["possibly_sensitive"], + "hashtags": hashtags, + "qrtURL": qrtURL, + "communityNote": communityNote + } + try: + apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp()) + except: + pass + + return apiObject \ No newline at end of file