Begin work on code refactor. Issues this will most likely solve when finished: #201, #195, #115, #20,

This commit is contained in:
Dylan 2024-04-17 21:40:26 +01:00
parent ec9cd3619c
commit ffcc96c13e
10 changed files with 143 additions and 692 deletions

View File

@ -10,8 +10,8 @@ videoDescLimit=220
tweetDescLimit=340 tweetDescLimit=340
def genLikesDisplay(vnf): def genLikesDisplay(vnf):
if vnf['rts'] > 0: if vnf['retweets'] > 0:
return ("\n\n💖 " + numerize.numerize(vnf['likes']) + " 🔁 " + numerize.numerize(vnf['rts'])) return ("\n\n💖 " + numerize.numerize(vnf['likes']) + " 🔁 " + numerize.numerize(vnf['retweets']))
else: else:
return ("\n\n💖 " + numerize.numerize(vnf['likes'])) return ("\n\n💖 " + numerize.numerize(vnf['likes']))

View File

@ -1,25 +1,10 @@
{% extends 'base.html' %} {% extends 'base.html' %}
<!--
{{ color }} - Custom Color Code set in the TwitFix Config
{{ appname }} - Custom Appname set in the TwitFix Config
{{ user }} - Username of the Source Tweet
{{ pic }} - Thumbnail Image for a given video
{{ vidlink }} - Direct link back to the source tweet
{{ vidurl }} - Direct MP4 link to the video contained in the tweet
{{ desc }} - Tweet Text
{{ url }} - Base URL of the site hosting TwitFix
{{ urlDesc }} - HTTP Encoded Tweet Text
{{ urlUser }} - HTTP Encoded Username
{{ urlLink }} - HTTP Encoded source link
{{ pfp }} - URL of the posters PFP
-->
{% block head %} {% block head %}
<meta content="text/html; charset=UTF-8" http-equiv="Content-Type" />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}" />
<meta name="twitter:card" content="summary_large_image" /> <meta name="twitter:card" content="summary_large_image" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" />
{% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ pic[0] }}" /> <meta name="twitter:image" content="{{ pic[0] }}" />
{% if pic[1] %} {% if pic[1] %}
@ -34,9 +19,9 @@
<meta name="twitter:image" content="{{ pic[3] }}" /> <meta name="twitter:image" content="{{ pic[3] }}" />
{% endif %} {% endif %}
<meta name="twitter:creator" content="@{{ user }}" /> <meta name="twitter:creator" content="@{{ tweet['user_name'] }}" />
<meta property="og:description" content="{{ desc }}" /> <meta property="og:description" content="{{ desc }}" />
<link rel="alternate" href="{{ url }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=photo&provider={{ appname }}" type="application/json+oembed" title="{{ user }}"> <link rel="alternate" href="{{ host }}/oembed.json?desc={{ urlUser }}&user=Twitter&link={{ tweetLink }}&ttype=photo&provider={{ appname }}" type="application/json+oembed" title="{{ tweet['user_name'] }}">
<meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %} <meta http-equiv="refresh" content="0; url = {{ tweetLink }}" /> {% endblock %} {% block body %} Redirecting you to the tweet in a moment. <a href="{{ tweetLink }}">Or click here.</a> {% endblock %}

View File

@ -1,16 +1,16 @@
{% extends 'base.html' %} {% block head %} {% extends 'base.html' %} {% block head %}
<meta content='text/html; charset=UTF-8' http-equiv='Content-Type' /> <meta content='text/html; charset=UTF-8' http-equiv='Content-Type' />
<meta name="twitter:player:stream" content="{{ vidurl }}" /> <meta name="twitter:player:stream" content="{{ media['url'] }}" />
<meta name="twitter:player:stream:content_type" content="video/mp4" /> <meta name="twitter:player:stream:content_type" content="video/mp4" />
<meta name="twitter:player:width" content="{{ videoSize['width'] }}" /> <meta name="twitter:player:width" content="{{ media['size']['width'] }}" />
<meta name="twitter:player:height" content="{{ videoSize['height'] }}" /> <meta name="twitter:player:height" content="{{ media['size']['height'] }}" />
<meta property="og:url" content="{{ vidurl }}" /> <meta property="og:url" content="{{ media['url'] }}" />
<meta property="og:video" content="{{ vidurl }}" /> <meta property="og:video" content="{{ media['url'] }}" />
<meta property="og:video:secure_url" content="{{ vidurl }}" /> <meta property="og:video:secure_url" content="{{ media['url'] }}" />
<meta property="og:video:type" content="video/mp4" /> <meta property="og:video:type" content="video/mp4" />
<meta property="og:video:width" content="{{ videoSize['width'] }}" /> <meta property="og:video:width" content="{{ media['size']['width'] }}" />
<meta property="og:video:height" content="{{ videoSize['height'] }}" /> <meta property="og:video:height" content="{{ media['size']['height'] }}" />
<meta name="twitter:card" content="player" /> <meta name="twitter:card" content="player" />
<meta http-equiv="refresh" content="0; url = {{ vidurl }}" /> {% endblock %} {% block body %} Redirecting you to the video in a moment. <a href="{{ vidurl }}">Or click here.</a> {% endblock %} <meta http-equiv="refresh" content="0; url = {{ media['url'] }}" /> {% endblock %} {% block body %} Redirecting you to the video in a moment. <a href="{{ media['url'] }}">Or click here.</a> {% endblock %}

View File

@ -1,28 +0,0 @@
<script>
function androidOrIOS() {
const userAgent = navigator.userAgent;
if(/android/i.test(userAgent)){
return 'android';
}
if(/iPad|iPhone|iPod/i.test(userAgent)){
return 'ios';
}
return 'unknown';
}
function redirect() {
const os = androidOrIOS();
if(os === 'android'){
window.location.href = 'twitter://status?status_id=1674915987789950982';
window.location.href = 'https://twitter.com/pdxdylan/status/1674915987789950982'
}
if(os === 'ios'){
window.location.href = 'twitter://status?id=1674915987789950982';
setTimeout(() => {
window.location.href = 'https://twitter.com/pdxdylan/status/1674915987789950982'
}, 100);
}
}
redirect();
</script>

View File

@ -1,30 +1,9 @@
{% extends 'base.html' %} {% extends 'base.html' %}
<!--
{{ color }} - Custom Color Code set in the TwitFix Config
{{ appname }} - Custom Appname set in the TwitFix Config
{{ user }} - Username of the Source Tweet
{{ pic }} - Thumbnail Image for a given video
{{ vidlink }} - Direct link back to the source tweet
{{ vidurl }} - Direct MP4 link to the video contained in the tweet
{{ desc }} - Tweet Text
{{ url }} - Base URL of the site hosting TwitFix
{{ urlDesc }} - HTTP Encoded Tweet Text
{{ urlUser }} - HTTP Encoded Username
{{ urlLink }} - HTTP Encoded source link
{{ pfp }} - URL of the posters PFP
{{ screenName }} - Users base username
{{ rts }} - Retweet Count
{{ likes }} - Like Count
{{ time }} - Time Created
-->
{% block head %} {% block head %}
<meta content="text/html; charset=UTF-8" http-equiv="Content-Type" />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}" />
<meta property="og:image" content="{{ pfp }}" /> <meta property="og:image" content="{{ pfp }}" />
<meta name="twitter:card" content="tweet" /> <meta name="twitter:card" content="tweet" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" /> {% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ pic[0] }}" /> <meta name="twitter:image" content="{{ pic[0] }}" />
<meta name="twitter:creator" content="@{{ user }}" /> <meta name="twitter:creator" content="@{{ user }}" />

View File

@ -0,0 +1,5 @@
<meta content='text/html; charset=UTF-8' http-equiv='Content-Type' />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}">
<meta name="twitter:title" content="{{ tweet['user_name'] }} (@{{ tweet['user_screen_name'] }})" />

View File

@ -1,10 +1,8 @@
{% extends 'base.html' %} {% block head %} {% extends 'base.html' %} {% block head %}
<meta content='text/html; charset=UTF-8' http-equiv='Content-Type' />
<meta content="{{ color }}" name="theme-color" />
<meta property="og:site_name" content="{{ appname }}">
<meta name="twitter:card" content="player" /> <meta name="twitter:card" content="player" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" /> {% include 'tweetCommon.html' %}
<meta name="twitter:image" content="{{ pic }}" /> <meta name="twitter:image" content="{{ pic }}" />
<meta name="twitter:player:width" content="{{ videoSize['width'] }}" /> <meta name="twitter:player:width" content="{{ videoSize['width'] }}" />
<meta name="twitter:player:height" content="{{ videoSize['height'] }}" /> <meta name="twitter:player:height" content="{{ videoSize['height'] }}" />
@ -17,7 +15,6 @@
<meta property="og:video:type" content="video/mp4" /> <meta property="og:video:type" content="video/mp4" />
<meta property="og:video:width" content="{{ videoSize['width'] }}" /> <meta property="og:video:width" content="{{ videoSize['width'] }}" />
<meta property="og:video:height" content="{{ videoSize['height'] }}" /> <meta property="og:video:height" content="{{ videoSize['height'] }}" />
<meta name="twitter:title" content="{{ user }} (@{{ screenName }}) {% if verified %}☑️{% else %}{% endif %}" />
<meta property="og:image" content="{{ pic[0] }}" /> <meta property="og:image" content="{{ pic[0] }}" />
<meta property="og:description" content="{{ desc }}" /> <meta property="og:description" content="{{ desc }}" />

View File

@ -48,11 +48,11 @@ def compareDict(original,compare):
compareDict(original[key],compare[key]) compareDict(original[key],compare[key])
## Specific API tests ## ## Specific API tests ##
def test_syndicationAPI(): def test_twextract_syndicationAPI():
tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus_syndication(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['description']
def test_extractStatusV2Anon(): def test_twextract_extractStatusV2Anon():
tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None) tweet = twExtract.extractStatusV2AnonLegacy(testTextTweet,None)
assert tweet["full_text"]==testTextTweet_compare['description'] assert tweet["full_text"]==testTextTweet_compare['description']
tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None) tweet = twExtract.extractStatusV2AnonLegacy(testVideoTweet,None)
@ -63,40 +63,40 @@ def test_extractStatusV2Anon():
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94] assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94]
def test_v2API(): def test_twextract_v2API():
tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatusV2Legacy(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['description']
## Tweet retrieve tests ## ## Tweet retrieve tests ##
def test_textTweetExtract(): def test_twextract_textTweetExtract():
tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testTextTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testTextTweet_compare['description'] assert tweet["full_text"]==testTextTweet_compare['description']
assert tweet["user"]["screen_name"]=="jack" assert tweet["user"]["screen_name"]=="jack"
assert 'extended_entities' not in tweet assert 'extended_entities' not in tweet
def test_extractV2(): # remove this when v2 is default def test_twextract_extractV2(): # remove this when v2 is default
tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens) tweet = twExtract.extractStatusV2(testTextTweet,workaroundTokens=tokens)
def test_UserExtract(): def test_twextract_UserExtract():
user = twExtract.extractUser(testUser,workaroundTokens=tokens) user = twExtract.extractUser(testUser,workaroundTokens=tokens)
assert user["screen_name"]=="jack" assert user["screen_name"]=="jack"
assert user["id"]==12 assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_UserExtractID(): def test_twextract_UserExtractID():
user = twExtract.extractUser(testUserID,workaroundTokens=tokens) user = twExtract.extractUser(testUserID,workaroundTokens=tokens)
assert user["screen_name"]=="jack" assert user["screen_name"]=="jack"
assert user["id"]==12 assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_UserExtractWeirdURLs(): def test_twextract_UserExtractWeirdURLs():
for url in testUserWeirdURLs: for url in testUserWeirdURLs:
user = twExtract.extractUser(url,workaroundTokens=tokens) user = twExtract.extractUser(url,workaroundTokens=tokens)
assert user["screen_name"]=="jack" assert user["screen_name"]=="jack"
assert user["id"]==12 assert user["id"]==12
assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006" assert user["created_at"] == "Tue Mar 21 20:50:14 +0000 2006"
def test_videoTweetExtract(): def test_twextract_videoTweetExtract():
tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testVideoTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testVideoTweet_compare['description'] assert tweet["full_text"]==testVideoTweet_compare['description']
assert 'extended_entities' in tweet assert 'extended_entities' in tweet
@ -106,7 +106,7 @@ def test_videoTweetExtract():
assert video["type"]=="video" assert video["type"]=="video"
def test_mediaTweetExtract(): def test_twextract_mediaTweetExtract():
tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"]==testMediaTweet_compare['description'] assert tweet["full_text"]==testMediaTweet_compare['description']
assert 'extended_entities' in tweet assert 'extended_entities' in tweet
@ -116,7 +116,7 @@ def test_mediaTweetExtract():
assert video["type"]=="photo" assert video["type"]=="photo"
def test_multimediaTweetExtract(): def test_twextract_multimediaTweetExtract():
tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens) tweet = twExtract.extractStatus(testMultiMediaTweet,workaroundTokens=tokens)
assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94] assert tweet["full_text"][:94]==testMultiMediaTweet_compare['description'][:94]
assert 'extended_entities' in tweet assert 'extended_entities' in tweet
@ -128,12 +128,12 @@ def test_multimediaTweetExtract():
assert video["media_url_https"]==testMultiMediaTweet_compare["images"][1] assert video["media_url_https"]==testMultiMediaTweet_compare["images"][1]
assert video["type"]=="photo" assert video["type"]=="photo"
def test_pollTweetExtract(): def test_twextract_pollTweetExtract():
tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens) tweet = twExtract.extractStatus("https://twitter.com/norm/status/651169346518056960",workaroundTokens=tokens)
assert 'card' in tweet assert 'card' in tweet
compareDict(testPoll_comparePoll,tweet['card']) compareDict(testPoll_comparePoll,tweet['card'])
def test_NSFW_TweetExtract(): def test_twextract_NSFW_TweetExtract():
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error
## VNF conversion test ## ## VNF conversion test ##

View File

@ -3,10 +3,7 @@ from flask import Flask, render_template, request, redirect, abort, Response, se
from flask_cors import CORS from flask_cors import CORS
import re import re
import os import os
import urllib.parse
import urllib.request
import combineImg import combineImg
from datetime import date,datetime, timedelta
from io import BytesIO, StringIO from io import BytesIO, StringIO
import msgs import msgs
import twExtract as twExtract import twExtract as twExtract
@ -14,7 +11,6 @@ from configHandler import config
from cache import addVnfToLinkCache,getVnfFromLinkCache from cache import addVnfToLinkCache,getVnfFromLinkCache
from yt_dlp.utils import ExtractorError from yt_dlp.utils import ExtractorError
import vxlogging as log import vxlogging as log
import zipfile
from vxApi import getApiResponse from vxApi import getApiResponse
from urllib.parse import urlparse from urllib.parse import urlparse
@ -54,18 +50,34 @@ def getTweetIdFromUrl(url):
else: else:
return None return None
def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
qrt = None
pollData = None
embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,pollData,msgs.genLikesDisplay(tweetData))
return render_template("image.html",
tweet=tweetData,
pic=[image],
host=config['config']['url'],
desc=embedDesc,
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
appname=config['config']['appname']+appnameSuffix,
)
def renderVideoTweetEmbed(tweetData,video,appnameSuffix=""):
# TODO: render video tweet embed template
return "Video tweet embed"
def renderTextTweetEmbed(tweetData,appnameSuffix=""):
# TODO: render text tweet embed template
return "Text tweet embed"
@app.route('/robots.txt') @app.route('/robots.txt')
def robots(): def robots():
return "User-agent: *\nDisallow: /" return "User-agent: *\nDisallow: /"
@app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly @app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly
def default(): def default():
global user_agent return redirect(config['config']['repo'], 301)
user_agent = request.headers.get('user-agent')
if isValidUserAgent(user_agent):
return message("TwitFix is an attempt to fix twitter video embeds in discord! created by Robin Universe :)\n\n💖\n\nClick me to be redirected to the repo!")
else:
return redirect(config['config']['repo'], 301)
@app.route('/oembed.json') #oEmbed endpoint @app.route('/oembed.json') #oEmbed endpoint
def oembedend(): def oembedend():
@ -76,157 +88,75 @@ def oembedend():
provName = request.args.get("provider",None) provName = request.args.get("provider",None)
return oEmbedGen(desc, user, link, ttype,providerName=provName) return oEmbedGen(desc, user, link, ttype,providerName=provName)
def getTweetData(twitter_url):
try:
rawTweetData = twExtract.extractStatusV2Anon(twitter_url)
except:
rawTweetData = None
if rawTweetData is None:
rawTweetData = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
if 'error' in rawTweetData:
return None
if rawTweetData is None:
return None
tweetData = getApiResponse(rawTweetData)
return tweetData
@app.route('/<path:sub_path>') # Default endpoint used by everything @app.route('/<path:sub_path>') # Default endpoint used by everything
def twitfix(sub_path): def twitfix(sub_path):
global user_agent
user_agent = request.headers.get('user-agent')
match = pathregex.search(sub_path) match = pathregex.search(sub_path)
if match is None:
abort(404)
twitter_url = f'https://twitter.com/i/status/{getTweetIdFromUrl(sub_path)}'
if request.url.endswith(".mp4") or request.url.endswith("%2Emp4"): tweetData = getTweetData(twitter_url)
twitter_url = "https://twitter.com/" + sub_path if tweetData is None:
abort(404)
if "?" not in request.url: qrt = None
clean = twitter_url[:-4] if 'qrtURL' in tweetData:
qrt = getTweetData(tweetData['qrtURL'])
###return tweetData
embedIndex = -1
# if url ends with /1, /2, /3, or /4, we'll use that as the index
if sub_path[-2:] in ["/1","/2","/3","/4"]:
embedIndex = int(sub_path[-1])-1
sub_path = sub_path[:-2]
if request.url.startswith("https://api.vx"): # Directly return the API response if the request is from the API
return tweetData
elif request.url.startswith("https://d.vx"): # direct embed
# determine what type of media we're dealing with
if not tweetData['hasMedia'] and qrt is None:
return renderTextTweetEmbed(tweetData)
elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
return redirect(tweetData['combinedMediaUrl'], 302)
else: else:
clean = twitter_url # this means we have mixed media or video, and we're only going to embed one
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
vnf,e = vnfFromCacheOrDL(clean) embedIndex = 0
if vnf is None: media = tweetData['media_extended'][embedIndex]
if e is not None: if media['type'] == "image":
return message(msgs.failedToScan+msgs.failedToScanExtra+e) return redirect(media['url'], 302)
return message(msgs.failedToScan) elif media['type'] == "video" or media['type'] == "animated_gif":
return make_cached_vnf_response(vnf,getTemplate("rawvideo.html",vnf,"",[],clean,"","","","")) return redirect(media['url'], 302) # TODO: might not work
elif request.url.endswith(".txt") or request.url.endswith("%2Etxt"): else: # full embed
twitter_url = "https://twitter.com/" + sub_path if not tweetData['hasMedia']:
return renderTextTweetEmbed(tweetData)
if "?" not in request.url: elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
clean = twitter_url[:-4] return renderImageTweetEmbed(tweetData,tweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality")
else: else:
clean = twitter_url # this means we have mixed media or video, and we're only going to embed one
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
vnf,e = vnfFromCacheOrDL(clean) embedIndex = 0
if vnf is None: media = tweetData['media_extended'][embedIndex]
if e is not None: if media['type'] == "image":
return abort(500,"Failed to scan tweet: "+e) return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=f' - Media {embedIndex+1}/{len(tweetData["media_extended"])}')
return abort(500,"Failed to scan tweet") elif media['type'] == "video" or media['type'] == "animated_gif":
return make_content_type_response(getTemplate("txt.html",vnf,vnf["description"],[],clean,"","","",""),"text/plain") return renderVideoTweetEmbed(tweetData,media['url'])
elif request.url.endswith(".zip") or request.url.endswith("%2Ezip"): # for certain types of archival software (i.e Hydrus)
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-4]
else:
clean = twitter_url
vnf,e = vnfFromCacheOrDL(clean)
if vnf is None:
if e is not None:
return abort(500,"Failed to scan tweet: "+e)
return abort(500,"Failed to scan tweet")
with app.app_context():
txtData = getTemplate("txt.html",vnf,vnf["description"],"",clean,"","","","")
txtIo = BytesIO()
txtIo.write(txtData.encode("utf-8"))
txtIo.seek(0)
zipIo = BytesIO()
with zipfile.ZipFile(zipIo, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.writestr("tweetInfo.txt", txtIo.read())
# todo: add images to zip
zipIo.seek(0)
return make_content_type_response(zipIo,"application/zip")
elif request.url.startswith("https://d.vx"): # Matches d.fx? Try to give the user a direct link
if isValidUserAgent(user_agent):
twitter_url = config['config']['url'] + "/"+sub_path
log.debug( "d.vx link shown to discord user-agent!")
if request.url.endswith(".mp4") and "?" not in request.url:
if "?" not in request.url:
clean = twitter_url[:-4]
else:
clean = twitter_url
else:
clean = twitter_url
return redirect(clean+".mp4", 301)
else:
log.debug("Redirect to MP4 using d.fxtwitter.com")
return dir(sub_path)
elif request.url.endswith("/1") or request.url.endswith("/2") or request.url.endswith("/3") or request.url.endswith("/4") or request.url.endswith("%2F1") or request.url.endswith("%2F2") or request.url.endswith("%2F3") or request.url.endswith("%2F4"):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-2]
else:
clean = twitter_url
image = ( int(request.url[-1]) - 1 ) return tweetData
return embed_video(clean, image)
elif request.url.startswith("https://api.vx"):
twitter_url = "https://twitter.com/" + sub_path
try:
try:
tweet = twExtract.extractStatusV2Anon(twitter_url)
except:
tweet = None
if tweet is None:
tweet = twExtract.extractStatusV2(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
if tweet is None:
log.error("API Get failed: " + twitter_url + " (Tweet null)")
abort(500, '{"message": "Failed to extract tweet (Twitter API error)"}')
if 'error' in tweet:
response = make_response(jsonify(tweet), 500)
response.headers['Content-Type'] = 'application/json'
response.cache_control.max_age = 3600
response.cache_control.public = True
return response
log.success("API Get success")
return getApiResponse(tweet)
except Exception as e:
log.error("API Get failed: " + twitter_url + " " + log.get_exception_traceback_str(e))
abort(500, '{"message": "Failed to extract tweet (Processing error)"}')
if match is not None:
twitter_url = sub_path
if match.start() == 0:
twitter_url = "https://twitter.com/" + sub_path
else:
# URL normalization messes up the URL, so we have to fix it
if sub_path.startswith("https:/") and not sub_path.startswith("https://"):
twitter_url = sub_path.replace("https:/", "https://", 1)
elif sub_path.startswith("http:/") and not sub_path.startswith("http://"):
twitter_url = sub_path.replace("http:/", "http://", 1)
if isValidUserAgent(user_agent):
res = embedCombined(twitter_url)
return res
else:
log.debug("Redirect to " + twitter_url)
return redirect(twitter_url, 301)
else:
return message("This doesn't appear to be a twitter URL")
@app.route('/dir/<path:sub_path>') # Try to return a direct link to the MP4 on twitters servers
def dir(sub_path):
global user_agent
user_agent = request.headers.get('user-agent')
url = sub_path
match = pathregex.search(url)
if match is not None:
twitter_url = url
if match.start() == 0:
twitter_url = "https://twitter.com/" + url
if isValidUserAgent(user_agent):
res = embed_video(twitter_url)
return res
else:
log.debug("Redirect to direct MP4 URL")
return direct_video(twitter_url)
else:
return redirect(url, 301)
@app.route('/favicon.ico') @app.route('/favicon.ico')
def favicon(): # pragma: no cover def favicon(): # pragma: no cover
@ -261,444 +191,6 @@ def rendercombined():
imgIo.seek(0) imgIo.seek(0)
return send_file(imgIo, mimetype='image/jpeg',max_age=86400) return send_file(imgIo, mimetype='image/jpeg',max_age=86400)
def upgradeVNF(vnf):
# Makes sure any VNF object passed through this has proper fields if they're added in later versions
if 'verified' not in vnf:
vnf['verified']=False
if 'size' not in vnf:
if vnf['type'] == 'Video':
vnf['size']={'width':720,'height':480}
else:
vnf['size']={}
if 'qrtURL' not in vnf:
if vnf['qrt'] == {}:
vnf['qrtURL'] = None
else: #
vnf['qrtURL'] = f"https://twitter.com/{vnf['qrt']['screen_name']}/status/{vnf['qrt']['id']}"
if 'isGif' not in vnf:
vnf['isGif'] = False
return vnf
def getDefaultTTL(): # TTL for deleting items from the database
return datetime.today().replace(microsecond=0) + timedelta(days=1)
def secondsUntilTTL(ttl):
untilTTL = ttl - datetime.today().replace(microsecond=0)
return untilTTL.total_seconds()
def make_content_type_response(response, content_type):
resp = make_response(response)
resp.headers['Content-Type'] = content_type
return resp
def make_cached_vnf_response(vnf,response):
return response
try:
if 'ttl' not in vnf or vnf['ttl'] == None or secondsUntilTTL(vnf['ttl']) <= 0:
return response
resp = make_response(response)
resp.cache_control.max_age = secondsUntilTTL(vnf['ttl'])
resp.cache_control.public = True
return resp
except Exception as e:
log.error("Error making cached response: " + str(e))
return response
def vnfFromCacheOrDL(video_link):
cached_vnf = getVnfFromLinkCache(video_link)
if cached_vnf == None:
try:
vnf = link_to_vnf(video_link)
addVnfToLinkCache(video_link, vnf)
log.success("VNF Get success")
return vnf,None
except (ExtractorError, twExtract.TwExtractError) as exErr:
if 'HTTP Error 404' in exErr.msg or 'No status found with that ID' in exErr.msg:
exErr.msg=msgs.tweetNotFound
elif 'suspended' in exErr.msg:
exErr.msg=msgs.tweetSuspended
else:
exErr.msg=msgs.unknownError
log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(exErr))
return None,exErr.msg
except Exception as e:
log.error("VNF Get failed: " + video_link + " " + log.get_exception_traceback_str(e))
return None,None
else:
return upgradeVNF(cached_vnf),None
def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return redirect(vnf['url'], 301)
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def direct_video_link(video_link): # Just get a redirect to a MP4 link from any tweet link
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return vnf['url']
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def embed_video(video_link, image=0): # Return Embed from any tweet link
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return embed(video_link, vnf, image)
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def tweetInfo(url, tweet="", desc="", thumb="", uploader="", screen_name="", pfp="", tweetType="", images="", hits=0, likes=0, rts=0, time="", qrtURL="", nsfw=False,ttl=None,verified=False,size={},poll=None,isGif=False): # Return a dict of video info with default values
if (ttl==None):
ttl = getDefaultTTL()
vnf = {
"tweet" : tweet,
"url" : url,
"description" : desc,
"thumbnail" : thumb,
"uploader" : uploader,
"screen_name" : screen_name,
"pfp" : pfp,
"type" : tweetType,
"images" : images,
"hits" : hits,
"likes" : likes,
"rts" : rts,
"time" : time,
"qrtURL" : qrtURL,
"nsfw" : nsfw,
"ttl" : ttl,
"verified" : verified,
"size" : size,
"poll" : poll,
"isGif" : isGif,
"tweetId" : int(getTweetIdFromUrl(tweet))
}
if (poll is None):
del vnf['poll']
return vnf
def link_to_vnf_from_tweet_data(tweet,video_link):
imgs = ["","","","", ""]
log.debug("Tweet Type: " + tweetType(tweet))
isGif=False
# Check to see if tweet has a video, if not, make the url passed to the VNF the first t.co link in the tweet
if tweetType(tweet) == "Video":
media=tweet['extended_entities']['media'][0]
if media['video_info']['variants']:
best_bitrate = -1
thumb = media['media_url']
if 'original_info' in media:
size=media["original_info"]
elif 'sizes' in media and ('large' in media["sizes"] or 'medium' in media["sizes"] or 'small' in media["sizes"] or 'thumb' in media["sizes"]):
possibleSizes=['large','medium','small','thumb']
for p in possibleSizes:
if p in media["sizes"]:
size={'width':media["sizes"][p]['w'],'height':media["sizes"][p]['h']}
break
else:
size={'width':720,'height':480}
for video in media['video_info']['variants']:
if video['content_type'] == "video/mp4" and '/hevc/' not in video["url"] and video['bitrate'] > best_bitrate:
url = video['url']
best_bitrate = video['bitrate']
elif tweetType(tweet) == "Text":
url = ""
thumb = ""
size = {}
else:
imgs = ["","","","", ""]
i = 0
for media in tweet['extended_entities']['media']:
imgs[i] = media['media_url_https']
i = i + 1
imgs[4] = str(i)
url = ""
images= imgs
thumb = tweet['extended_entities']['media'][0]['media_url_https']
size = {}
if 'extended_entities' in tweet and 'media' in tweet['extended_entities'] and tweet['extended_entities']['media'][0]['type'] == 'animated_gif':
isGif=True
qrtURL = None
if 'quoted_status_permalink' in tweet:
qrtURL = tweet['quoted_status_permalink']['expanded']
elif 'quoted_status_id_str' in tweet:
qrtURL = "https://twitter.com/i/status/" + tweet['quoted_status_id_str']
text = tweet['full_text']
if 'possibly_sensitive' in tweet:
nsfw = tweet['possibly_sensitive']
else:
nsfw = False
if 'entities' in tweet and 'urls' in tweet['entities']:
for eurl in tweet['entities']['urls']:
if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"):
text = text.replace(eurl["url"], "")
else:
text = text.replace(eurl["url"],eurl["expanded_url"])
ttl = None #default
try:
if 'card' in tweet and tweet['card']['name'].startswith('poll'):
poll=getPollObject(tweet['card'])
if tweet['card']['binding_values']['counts_are_final']['boolean_value'] == False:
ttl = datetime.today().replace(microsecond=0) + timedelta(minutes=1)
else:
poll=None
except:
poll=None
vnf = tweetInfo(
url,
video_link,
text, thumb,
tweet['user']['name'],
tweet['user']['screen_name'],
tweet['user']['profile_image_url'],
tweetType(tweet),
likes=tweet['favorite_count'],
rts=tweet['retweet_count'],
time=tweet['created_at'],
qrtURL=qrtURL,
images=imgs,
nsfw=nsfw,
verified=tweet['user']['verified'],
size=size,
poll=poll,
ttl=ttl,
isGif=isGif
)
return vnf
def link_to_vnf_from_unofficial_api(video_link):
tweet=None
log.info("Attempting to download tweet info: "+video_link)
tweet = twExtract.extractStatus(video_link,workaroundTokens=config['config']['workaroundTokens'].split(','))
log.success("Unofficial API Success")
if "extended_entities" not in tweet:
# check if any entities with urls ending in /video/XXX or /photo/XXX exist
if "entities" in tweet and "urls" in tweet["entities"]:
for url in tweet["entities"]["urls"]:
if "/video/" in url["expanded_url"] or "/photo/" in url["expanded_url"]:
log.info("Extra tweet info found in entities: "+video_link+" -> "+url["expanded_url"])
subTweet = twExtract.extractStatus(url["expanded_url"],workaroundTokens=config['config']['workaroundTokens'].split(','))
if "extended_entities" in subTweet:
tweet["extended_entities"] = subTweet["extended_entities"]
break
return link_to_vnf_from_tweet_data(tweet,video_link)
def link_to_vnf(video_link): # Return a VideoInfo object or die trying
return link_to_vnf_from_unofficial_api(video_link)
def message(text):
return render_template(
'default.html',
message = text,
color = config['config']['color'],
appname = config['config']['appname'],
repo = config['config']['repo'],
url = config['config']['url'] )
def getTemplate(template,vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix="",embedVNF=None):
if (embedVNF is None):
embedVNF = vnf
if ('width' in embedVNF['size'] and 'height' in embedVNF['size']):
embedVNF['size']['width'] = min(embedVNF['size']['width'],2000)
embedVNF['size']['height'] = min(embedVNF['size']['height'],2000)
return render_template(
template,
likes = vnf['likes'],
rts = vnf['rts'],
time = vnf['time'],
screenName = vnf['screen_name'],
vidlink = embedVNF['url'],
userLink = f"https://twitter.com/{vnf['screen_name']}",
pfp = vnf['pfp'],
vidurl = embedVNF['url'],
desc = desc,
pic = images,
user = vnf['uploader'],
video_link = vnf,
color = color,
appname = config['config']['appname'] + appNameSuffix,
repo = config['config']['repo'],
url = config['config']['url'],
urlDesc = urlDesc,
urlUser = urlUser,
urlLink = urlLink,
urlUserLink= urllib.parse.quote(f"https://twitter.com/{vnf['screen_name']}"),
tweetLink = vnf['tweet'],
videoSize = embedVNF['size'] )
def embed(video_link, vnf, image):
log.info("Embedding " + vnf['type'] + ": " + video_link)
desc = re.sub(r' https:\/\/t\.co\/\S+(?=\s|$)', '', vnf['description'])
urlUser = urllib.parse.quote(vnf['uploader'])
urlDesc = urllib.parse.quote(desc)
urlLink = urllib.parse.quote(video_link)
likeDisplay = msgs.genLikesDisplay(vnf)
if 'poll' in vnf:
pollDisplay= msgs.genPollDisplay(vnf['poll'])
else:
pollDisplay=""
qrt=None
if vnf['qrtURL'] is not None:
qrt,e=vnfFromCacheOrDL(vnf['qrtURL'])
if qrt is not None:
desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay)
else:
desc=msgs.formatEmbedDesc(vnf['type'],desc,None,pollDisplay,likeDisplay)
embedVNF=None
appNamePost = ""
if vnf['type'] == "Text": # Change the template based on tweet type
template = 'text.html'
if qrt is not None and qrt['type'] != "Text":
embedVNF=qrt
if qrt['type'] == "Image":
if embedVNF['images'][4]!="1":
appNamePost = " - Image " + str(image+1) + " of " + str(vnf['images'][4])
image = embedVNF['images'][image]
template = 'image.html'
elif qrt['type'] == "Video" or qrt['type'] == "":
urlDesc = urllib.parse.quote(desc)
template = 'video.html'
if vnf['type'] == "Image":
if vnf['images'][4]!="1":
appNamePost = " - Image " + str(image+1) + "/" + str(vnf['images'][4])
image = vnf['images'][image]
template = 'image.html'
if vnf['type'] == "Video":
if vnf['isGif'] == True and config['config']['gifConvertAPI'] != "" and config['config']['gifConvertAPI'] != "none":
vnf['url'] = f"{config['config']['gifConvertAPI']}/convert.mp4?url={vnf['url']}"
appNamePost = " - GIF"
urlDesc = urllib.parse.quote(desc)
template = 'video.html'
if vnf['type'] == "":
urlDesc = urllib.parse.quote(desc)
template = 'video.html'
color = "#7FFFD4" # Green
if vnf['nsfw'] == True:
color = "#800020" # Red
return make_cached_vnf_response(vnf,getTemplate(template,vnf,desc,[image],video_link,color,urlDesc,urlUser,urlLink,appNamePost,embedVNF))
def embedCombined(video_link):
vnf,e = vnfFromCacheOrDL(video_link)
if vnf != None:
return make_cached_vnf_response(vnf,embedCombinedVnf(video_link, vnf))
else:
if e is not None:
return message(msgs.failedToScan+msgs.failedToScanExtra+e)
return message(msgs.failedToScan)
def embedCombinedVnf(video_link,vnf):
qrt=None
if vnf['qrtURL'] is not None:
qrt,e=vnfFromCacheOrDL(vnf['qrtURL'])
if vnf['type'] != "Image" and vnf['type'] != "Video" and qrt is not None and qrt['type'] == "Image":
if qrt['images'][4]!="1":
vnf['images'] = qrt['images']
vnf['type'] = "Image"
if vnf['type'] != "Image" or vnf['images'][4] == "1":
return embed(video_link, vnf, 0)
desc = re.sub(r' http.*t\.co\S+', '', vnf['description'])
urlUser = urllib.parse.quote(vnf['uploader'])
urlDesc = urllib.parse.quote(desc)
urlLink = urllib.parse.quote(video_link)
likeDisplay = msgs.genLikesDisplay(vnf)
if 'poll' in vnf:
pollDisplay= msgs.genPollDisplay(vnf['poll'])
else:
pollDisplay=""
if qrt is not None:
desc=msgs.formatEmbedDesc(vnf['type'],desc,qrt,pollDisplay,likeDisplay)
suffix=""
#if 'Discord' in user_agent:
# images = []
# for i in range(0,int(vnf['images'][4])):
# images.append(vnf['images'][i])
#else:
host = config['config']['url']
image = f"{host}/rendercombined.jpg?imgs="
for i in range(0,int(vnf['images'][4])):
image = image + vnf['images'][i] + ","
image = image[:-1] # Remove last comma
images=[image]
suffix=" - View original tweet for full quality"
color = "#7FFFD4" # Green
if vnf['nsfw'] == True:
color = "#800020" # Red
return make_cached_vnf_response(vnf,getTemplate('image.html',vnf,desc,images,video_link,color,urlDesc,urlUser,urlLink,appNameSuffix=suffix))
def getPollObject(card):
poll={"total_votes":0,"choices":[]}
choiceCount=0
if (card["name"]=="poll2choice_text_only"):
choiceCount=2
elif (card["name"]=="poll3choice_text_only"):
choiceCount=3
elif (card["name"]=="poll4choice_text_only"):
choiceCount=4
for i in range(0,choiceCount):
choice = {"text":card["binding_values"][f"choice{i+1}_label"]["string_value"],"votes":int(card["binding_values"][f"choice{i+1}_count"]["string_value"])}
poll["total_votes"]+=choice["votes"]
poll["choices"].append(choice)
# update each choice with a percentage
for choice in poll["choices"]:
choice["percent"] = round((choice["votes"]/poll["total_votes"])*100,1)
return poll
def tweetType(tweet): # Are we dealing with a Video, Image, or Text tweet?
if 'extended_entities' in tweet:
if 'video_info' in tweet['extended_entities']['media'][0]:
out = "Video"
else:
out = "Image"
else:
out = "Text"
return out
def oEmbedGen(description, user, video_link, ttype,providerName=None): def oEmbedGen(description, user, video_link, ttype,providerName=None):
if providerName == None: if providerName == None:
providerName = config['config']['appname'] providerName = config['config']['appname']

View File

@ -90,6 +90,24 @@ def getApiResponse(tweet,include_txt=False,include_zip=False):
else: else:
twText = twText.replace(eurl["url"],eurl["expanded_url"]) twText = twText.replace(eurl["url"],eurl["expanded_url"])
# check if all extended media are the same type
sameMedia = False
if len(media_extended) > 1:
sameMedia = True
for i in media_extended:
if i["type"] != media_extended[0]["type"]:
sameMedia = False
break
combinedMediaUrl = None
if sameMedia and media_extended[0]["type"] == "image":
host=config['config']['url']
combinedMediaUrl = f'{host}/rendercombined.jpg?imgs='
for i in media:
combinedMediaUrl += i + ","
combinedMediaUrl = combinedMediaUrl[:-1]
apiObject = { apiObject = {
"text": twText, "text": twText,
"likes": tweetL["favorite_count"], "likes": tweetL["favorite_count"],
@ -107,7 +125,10 @@ def getApiResponse(tweet,include_txt=False,include_zip=False):
"possibly_sensitive": tweetL["possibly_sensitive"], "possibly_sensitive": tweetL["possibly_sensitive"],
"hashtags": hashtags, "hashtags": hashtags,
"qrtURL": qrtURL, "qrtURL": qrtURL,
"communityNote": communityNote "communityNote": communityNote,
"allSameType": sameMedia,
"hasMedia": len(media) > 0,
"combinedMediaUrl": combinedMediaUrl
} }
try: try:
apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp()) apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp())