Compare commits
30 Commits
85efbeda4d
...
alterware
Author | SHA1 | Date | |
---|---|---|---|
f7dddc42c1 | |||
f849d24779 | |||
fc17870b06 | |||
7e8f7b87c9 | |||
656bcd40b9
|
|||
061e7fb96e | |||
f3adcf0c78 | |||
72847c7993 | |||
54ec334730 | |||
1d03bf80e7 | |||
e64aa41ff6 | |||
c67422d569 | |||
fa979086c9 | |||
9ca4b31796 | |||
9ea320eb9c | |||
f48a9b205d | |||
441b620b87 | |||
84b620894f | |||
9a4889fab3 | |||
b87ad04e5f | |||
c2ed8aa884 | |||
22828a1fc8
|
|||
84a19e9baa | |||
68717be147 | |||
740a300d1e | |||
dde6f889cb | |||
24812acd32 | |||
9452e7be8c | |||
a851fe587b | |||
6aad6aee7f |
27
.github/workflows/python-tests.yml
vendored
27
.github/workflows/python-tests.yml
vendored
@ -6,23 +6,30 @@ jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
environment: test
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11", "3.12"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Check out files
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Install Python
|
||||
run: |
|
||||
apt-get update
|
||||
apt-get install -y python3 python3-pip python3-venv
|
||||
|
||||
- name: Set up virtual environment
|
||||
run: |
|
||||
python3 -m venv venv
|
||||
. venv/bin/activate
|
||||
python -m pip install --upgrade pip
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
. venv/bin/activate
|
||||
pip install pytest pytest-cov pytest-env
|
||||
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
|
||||
|
||||
- name: Test with pytest
|
||||
env:
|
||||
VXTWITTER_WORKAROUND_TOKENS: ${{ secrets.VXTWITTER_WORKAROUND_TOKENS }}
|
||||
run: |
|
||||
. venv/bin/activate
|
||||
pytest --cov-config=.coveragerc --cov=.
|
||||
|
79
activity.py
Normal file
79
activity.py
Normal file
@ -0,0 +1,79 @@
|
||||
import datetime
|
||||
import msgs
|
||||
from utils import determineEmbedTweet, determineMediaToEmbed
|
||||
from copy import deepcopy
|
||||
|
||||
def tweetDataToActivity(tweetData,embedIndex = -1):
|
||||
content=""
|
||||
|
||||
if tweetData['replyingTo'] is not None:
|
||||
content += f"<blockquote>↪️ <i>Replying to @{tweetData['replyingTo']}</i></blockquote>"
|
||||
content+=f"<p>{tweetData['text']}</p>"
|
||||
|
||||
attachments=[]
|
||||
if tweetData['qrt'] is not None:
|
||||
content += f"<blockquote><b>QRT: <a href=\"{tweetData['qrtURL']}\">{tweetData['qrt']['user_screen_name']}</a></b><br>{tweetData['qrt']['text']}</blockquote>"
|
||||
if tweetData['pollData'] is not None:
|
||||
content += f"<p>{msgs.genPollDisplay(tweetData['pollData'])}</p>"
|
||||
content += "</p>"
|
||||
content = content.replace("\n","<br>")
|
||||
#if media is not None:
|
||||
# attachments.append({"type":mediatype,"url":media})
|
||||
likes = tweetData['likes']
|
||||
retweets = tweetData['retweets']
|
||||
|
||||
# convert date epoch to iso format
|
||||
date = tweetData['date_epoch']
|
||||
date = datetime.datetime.fromtimestamp(date).isoformat() + "Z"
|
||||
|
||||
embedTweetData = determineEmbedTweet(tweetData)
|
||||
embeddingMedia = embedTweetData['hasMedia']
|
||||
media = None
|
||||
if embeddingMedia:
|
||||
media = determineMediaToEmbed(embedTweetData,embedIndex)
|
||||
|
||||
if media is not None:
|
||||
media = deepcopy(media)
|
||||
if media['type'] == "gif":
|
||||
media['type'] = "gifv"
|
||||
if 'thumbnail_url' not in media:
|
||||
media['thumbnail_url'] = media['url']
|
||||
if media['type'] == "image" and "?" not in media['url']:
|
||||
media['url'] += "?name=orig"
|
||||
attachments.append({
|
||||
"id": "114163769487684704",
|
||||
"type": media['type'],
|
||||
"url": media['url'],
|
||||
"preview_url": media['thumbnail_url'],
|
||||
})
|
||||
|
||||
# https://docs.joinmastodon.org/methods/statuses/
|
||||
return {
|
||||
"id": tweetData['tweetID'],
|
||||
"url": f"https://x.com/{tweetData['user_screen_name']}/status/{tweetData['tweetID']}",
|
||||
"uri": f"https://x.com/{tweetData['user_screen_name']}/status/{tweetData['tweetID']}",
|
||||
"created_at": date,
|
||||
"edited_at": None,
|
||||
"reblog": None,
|
||||
"in_reply_to_account_id": None,
|
||||
"language": "en",
|
||||
"content": content,
|
||||
"spoiler_text": "",
|
||||
"visibility": "public",
|
||||
"application": {
|
||||
"website": None
|
||||
},
|
||||
"media_attachments": attachments,
|
||||
"account": {
|
||||
"display_name": tweetData['user_name'],
|
||||
"username": tweetData['user_screen_name'],
|
||||
"acct": tweetData['user_screen_name'],
|
||||
"url": f"https://x.com/{tweetData['user_screen_name']}/status/{tweetData['tweetID']}",
|
||||
"uri": f"https://x.com/{tweetData['user_screen_name']}/status/{tweetData['tweetID']}",
|
||||
"locked": False,
|
||||
"avatar": tweetData['user_profile_image_url'],
|
||||
"avatar_static": tweetData['user_profile_image_url'],
|
||||
"hide_collections": False,
|
||||
"noindex": False,
|
||||
},
|
||||
}
|
6
cache.py
6
cache.py
@ -49,7 +49,9 @@ def addVnfToTweetIdCache(tweet_id, vnf):
|
||||
global link_cache
|
||||
try:
|
||||
if link_cache_system == "db":
|
||||
out = db.linkCache.update_one(vnf)
|
||||
filter_query = {'tweet': tweet_id}
|
||||
update_operation = {'$set': vnf}
|
||||
out = db.linkCache.update_one(filter_query, update_operation, upsert=True)
|
||||
log.debug("Link added to DB cache ")
|
||||
return True
|
||||
elif link_cache_system == "json":
|
||||
@ -86,7 +88,7 @@ def getVnfFromTweetIdCache(tweet_id):
|
||||
collection = db.linkCache
|
||||
vnf = collection.find_one({'tweet': tweet_id})
|
||||
if vnf != None:
|
||||
hits = ( vnf['hits'] + 1 )
|
||||
hits = ( vnf.get('hits', 0) + 1 )
|
||||
log.debug("Link located in DB cache.")
|
||||
query = { 'tweet': tweet_id }
|
||||
change = { "$set" : { "hits" : hits } }
|
||||
|
12
config.json
12
config.json
@ -2,14 +2,14 @@
|
||||
"config": {
|
||||
"appname": "vxTwitter",
|
||||
"color": "#1DA1F2",
|
||||
"database": "[url to mongo database goes here]",
|
||||
"link_cache": "ram",
|
||||
"database": "mongodb://localhost:27017/bettervxtwitter",
|
||||
"link_cache": "db",
|
||||
"method": "hybrid",
|
||||
"repo": "https://github.com/dylanpdx/BetterTwitFix",
|
||||
"table": "[database table here]",
|
||||
"url": "https://vxtwitter.com",
|
||||
"repo": "https://git.alterware.dev/alterware/BetterTwitFix",
|
||||
"table": "links",
|
||||
"url": "https://girlcock.alterware.dev",
|
||||
"combination_method": "local",
|
||||
"gifConvertAPI": "local",
|
||||
"workaroundTokens":null
|
||||
"workaroundTokens": null
|
||||
}
|
||||
}
|
@ -8,3 +8,4 @@ Werkzeug==2.3.7
|
||||
numerize==0.12
|
||||
oauthlib==3.2.2
|
||||
PyRTF3==0.47.5
|
||||
XClientTransaction==0.0.2
|
BIN
static/richEmbed/gif.png
Normal file
BIN
static/richEmbed/gif.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.5 KiB |
BIN
static/richEmbed/image.png
Normal file
BIN
static/richEmbed/image.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.6 KiB |
BIN
static/richEmbed/text.png
Normal file
BIN
static/richEmbed/text.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 1.9 KiB |
BIN
static/richEmbed/video.png
Normal file
BIN
static/richEmbed/video.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 784 B |
@ -2,7 +2,21 @@
|
||||
<meta content="{{ color }}" name="theme-color" />
|
||||
<meta property="og:site_name" content="{{ appname }}">
|
||||
|
||||
<meta name="twitter:title" content="{{ tweet['user_name'] }} (@{{ tweet['user_screen_name'] }})" />
|
||||
<meta name="og:title" content="{{ tweet['user_name'] }} (@{{ tweet['user_screen_name'] }})" />
|
||||
|
||||
{% if activityLink %}
|
||||
<link type="application/activity+json" href="{{ activityLink|safe }}" />
|
||||
{% endif %}
|
||||
|
||||
{% if sicon %}
|
||||
<!-- Video icon created by Rizki Ahmad Fauzi - Flaticon -->
|
||||
<!-- Picture and Text icon created by Freepik - Flaticon -->
|
||||
<!-- Gif file icons created by Grand Iconic - Flaticon -->
|
||||
<link rel="apple-touch-icon" sizes="128x128" href="/{{ sicon }}.png">
|
||||
{% else %}
|
||||
<link rel="apple-touch-icon" sizes="180x180" href="/apple-touch-icon.png">
|
||||
{% endif %}
|
||||
|
||||
<script src="/openInApp.js"></script>
|
||||
<script>document.addEventListener('DOMContentLoaded',function(){openTweet('{{ tweet["tweetID"] }}')})</script>
|
||||
<style>
|
||||
|
@ -8,7 +8,7 @@
|
||||
<meta content="{{ color }}" name="theme-color" />
|
||||
<meta property="og:site_name" content="{{ appname }}">
|
||||
|
||||
<meta name="twitter:title" content="{{ user['name'] }} (@{{ user['screen_name'] }})" />
|
||||
<meta name="og:title" content="{{ user['name'] }} (@{{ user['screen_name'] }})" />
|
||||
|
||||
<meta name="twitter:image" content="{{ user['profile_image_url'] }}" />
|
||||
<meta name="twitter:creator" content="@{{ user['name'] }}" />
|
||||
|
44
test_api.py
44
test_api.py
@ -3,6 +3,17 @@ import twitfix,twExtract
|
||||
from flask.testing import FlaskClient
|
||||
client = FlaskClient(twitfix.app)
|
||||
|
||||
def test_api_get_tweet():
|
||||
resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=true",headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert resp.status_code==200
|
||||
assert jData['text'] == 'just setting up my twttr'
|
||||
|
||||
def test_api_get_invalid_tweet():
|
||||
resp = client.get("https://vxtwitter.com/test/status/None",headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert resp.status_code!=200
|
||||
|
||||
def test_api_include_txt():
|
||||
resp = client.get(testTextTweet.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_txt=true",headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
@ -38,7 +49,38 @@ def test_api_include_rtf_nomedia():
|
||||
assert not any(".rtf" in i for i in jData["mediaURLs"])
|
||||
|
||||
def test_api_user():
|
||||
resp = client.get(testUser.replace("https://twitter.com","https://api.vxtwitter.com")+"?include_rtf=true",headers={"User-Agent":"test"})
|
||||
resp = client.get(testUser.replace("https://twitter.com","https://api.vxtwitter.com"),headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert resp.status_code==200
|
||||
assert jData["screen_name"]=="jack"
|
||||
|
||||
def test_api_user_suspended():
|
||||
resp = client.get(testUserSuspended.replace("https://twitter.com","https://api.vxtwitter.com"),headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert resp.status_code==500
|
||||
assert 'suspended' in jData["error"]
|
||||
|
||||
def test_api_user_private():
|
||||
resp = client.get(testUserPrivate.replace("https://twitter.com","https://api.vxtwitter.com")+"?with_tweets=true",headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert jData['protected'] == True
|
||||
assert len(jData["latest_tweets"])==0
|
||||
|
||||
def test_api_user_invalid():
|
||||
resp = client.get(testUserInvalid.replace("https://twitter.com","https://api.vxtwitter.com")+"?with_tweets=true",headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert resp.status_code==404
|
||||
|
||||
def test_api_user_feed():
|
||||
resp = client.get(testUser.replace("https://twitter.com","https://api.vxtwitter.com")+"?with_tweets=true",headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert resp.status_code==200
|
||||
assert jData["screen_name"]=="jack"
|
||||
assert len(jData["latest_tweets"])>0
|
||||
|
||||
def test_api_retweet():
|
||||
resp = client.get(testRetweetTweet.replace("https://twitter.com","https://api.vxtwitter.com"),headers={"User-Agent":"test"})
|
||||
jData = resp.get_json()
|
||||
assert jData['retweetURL'] == 'https://twitter.com/i/status/1828569456231993456'
|
||||
assert jData['retweet'] != None
|
||||
assert jData['retweet']['text'].startswith("If you want to try")
|
@ -194,3 +194,14 @@ def test_embed_rtf():
|
||||
resp = client.get(testTextTweet.replace("https://twitter.com","")+".rtf",headers={"User-Agent":"test"})
|
||||
assert resp.status_code==200
|
||||
assert testTextTweet_compare["text"] in str(resp.data)
|
||||
|
||||
def test_embed_action():
|
||||
cache.clearCache()
|
||||
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"test"})
|
||||
assert resp.status_code==200
|
||||
assert "application/activity+json" in str(resp.data)
|
||||
assert "%F0%9F%92%96" in str(resp.data) # 💖
|
||||
resp = client.get(testTextTweet.replace("https://twitter.com",""),headers={"User-Agent":"Mozilla/5.0 (compatible; Discordbot/2.0; +https://discordapp.com)"})
|
||||
assert resp.status_code==200
|
||||
assert "application/activity+json" in str(resp.data)
|
||||
assert "%F0%9F%92%96" not in str(resp.data) # 💖
|
@ -101,7 +101,6 @@ def test_twextract_pollTweetExtract(): # basic check if poll data exists
|
||||
def test_twextract_NSFW_TweetExtract():
|
||||
tweet = twExtract.extractStatus(testNSFWTweet,workaroundTokens=tokens) # For now just test that there's no error
|
||||
|
||||
'''
|
||||
def test_twextract_feed():
|
||||
tweet = twExtract.extractUserFeedFromId(testUserID,workaroundTokens=tokens)
|
||||
'''
|
||||
tweets = twExtract.extractUserFeedFromId(testUserID,workaroundTokens=tokens) # For now just test that there's no error
|
||||
assert len(tweets)>0
|
@ -12,6 +12,7 @@ tests = {
|
||||
"testPollTweet": "https://twitter.com/norm/status/651169346518056960",
|
||||
"testMixedMediaTweet":"https://twitter.com/bigbeerfest/status/1760638922084741177",
|
||||
"testVinePlayerTweet":"https://twitter.com/Roblox/status/583302104342638592",
|
||||
"testRetweetTweet":"https://twitter.com/pdxdylan/status/1828570470222045294",
|
||||
}
|
||||
|
||||
def getVNFFromLink(link):
|
||||
|
@ -14,6 +14,8 @@ v2bearer="Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1
|
||||
androidBearer="Bearer AAAAAAAAAAAAAAAAAAAAAFXzAwAAAAAAMHCxpeSDG1gLNLghVe8d74hl6k4%3DRUMF4xAQLsbeBhTSRrCiQpJtxoGWeyHrDb5te2jpGskWDFW82F"
|
||||
tweetdeckBearer="Bearer AAAAAAAAAAAAAAAAAAAAAFQODgEAAAAAVHTp76lzh3rFzcHbmHVvQxYYpTw%3DckAlMINMjmCwxUcaXbAN4XqJVdgMJaHqNOFgPMK0zN1qLqLQCF"
|
||||
|
||||
requestUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0"
|
||||
|
||||
bearerTokens=[tweetdeckBearer,bearer,v2bearer,androidBearer]
|
||||
|
||||
guestToken=None
|
||||
@ -36,8 +38,8 @@ tweetDetailGraphqlFeatures='{"rweb_tipjar_consumption_enabled":true,"responsive_
|
||||
tweetDetailGraphql_api="e7RKseIxLu7HgkWNKZ6qnw"
|
||||
|
||||
# this is for UserTweets endpoint
|
||||
tweetFeedGraphqlFeatures='{"profile_label_improvements_pcf_label_in_post_enabled":true,"rweb_tipjar_consumption_enabled":true,"responsive_web_graphql_exclude_directive_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":true,"responsive_web_jetfuel_frame":false,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"rweb_video_timestamps_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_enhance_cards_enabled":false}'
|
||||
tweetFeedGraphql_api="Y9WM4Id6UcGFE8Z-hbnixw"
|
||||
tweetFeedGraphqlFeatures='{"rweb_video_screen_enabled":false,"profile_label_improvements_pcf_label_in_post_enabled":true,"rweb_tipjar_consumption_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":false,"responsive_web_jetfuel_frame":false,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_show_grok_translated_post":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_enhance_cards_enabled":false}'
|
||||
tweetFeedGraphql_api="Li2XXGESVev94TzFtntrgA"
|
||||
|
||||
twitterUrl = "x.com" # doubt this will change but just in case
|
||||
class TwExtractError(Exception):
|
||||
@ -99,7 +101,7 @@ def twitterApiGet(url,btoken=None,authToken=None,guestToken=None):
|
||||
|
||||
def getAuthHeaders(btoken,authToken=None,guestToken=None):
|
||||
csrfToken=str(uuid.uuid4()).replace('-', '')
|
||||
headers = {"x-twitter-active-user":"yes","x-twitter-client-language":"en","x-csrf-token":csrfToken,"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0"}
|
||||
headers = {"x-twitter-active-user":"yes","x-twitter-client-language":"en","x-csrf-token":csrfToken,"User-Agent":requestUserAgent}
|
||||
headers['Authorization'] = btoken
|
||||
|
||||
if authToken is not None:
|
||||
@ -107,6 +109,7 @@ def getAuthHeaders(btoken,authToken=None,guestToken=None):
|
||||
headers["x-twitter-auth-type"] = "OAuth2Session"
|
||||
if guestToken is not None:
|
||||
headers["x-guest-token"] = guestToken
|
||||
headers["Cookie"] = f"gt={guestToken}; ct0={csrfToken}; guest_id=v1:174804309415864668;"
|
||||
|
||||
return headers
|
||||
|
||||
@ -114,7 +117,7 @@ def getGuestToken():
|
||||
global guestToken
|
||||
global guestTokenUses
|
||||
if guestToken is None:
|
||||
r = requests.get(f"https://{twitterUrl}",headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0","Cookie":"night_mode=2"},allow_redirects=False)
|
||||
r = requests.get(f"https://{twitterUrl}",headers={"User-Agent":requestUserAgent,"Cookie":"night_mode=2"},allow_redirects=False)
|
||||
m = re.search(gt_pattern, r.text)
|
||||
if m is None:
|
||||
r = requests.post(f"https://api.{twitterUrl}/1.1/guest/activate.json", headers={"Authorization":bearer})
|
||||
@ -313,7 +316,11 @@ def extractStatusV2Android(url,workaroundTokens):
|
||||
print(f"Error in output: {json.dumps(output['errors'])}")
|
||||
# try another token
|
||||
continue
|
||||
entries=output['data']['timeline_response']['instructions'][0]['entries']
|
||||
entries = None
|
||||
for instruction in output['data']['timeline_response']['instructions']:
|
||||
if instruction["__typename"] == "TimelineAddEntries":
|
||||
entries = instruction['entries']
|
||||
break
|
||||
tweetEntry=None
|
||||
for entry in entries:
|
||||
if 'content' not in entry:
|
||||
@ -370,7 +377,11 @@ def extractStatusV2TweetDetail(url,workaroundTokens):
|
||||
print(f"Error in output: {json.dumps(output['errors'])}")
|
||||
# try another token
|
||||
continue
|
||||
entries=output['data']['threaded_conversation_with_injections_v2']['instructions'][0]['entries']
|
||||
entries = None
|
||||
for instruction in output['data']['threaded_conversation_with_injections_v2']['instructions']:
|
||||
if instruction["type"] == "TimelineAddEntries":
|
||||
entries = instruction['entries']
|
||||
break
|
||||
tweetEntry=None
|
||||
for entry in entries:
|
||||
if 'content' not in entry:
|
||||
@ -499,6 +510,8 @@ def extractUser(url,workaroundTokens):
|
||||
raise TwExtractError(error["code"], error["message"])
|
||||
return output
|
||||
except Exception as e:
|
||||
if hasattr(e,"msg") and (e.msg == 'User has been suspended.' or e.msg == 'User not found.'):
|
||||
raise e
|
||||
continue
|
||||
raise TwExtractError(400, "Extract error")
|
||||
|
||||
@ -510,25 +523,41 @@ def extractUserFeedFromId(userId,workaroundTokens):
|
||||
# TODO: https://api.twitter.com/graphql/x31u1gdnjcqtiVZFc1zWnQ/UserWithProfileTweetsQueryV2?variables={"cursor":"?","includeTweetImpression":true,"includeHasBirdwatchNotes":false,"includeEditPerspective":false,"includeEditControl":true,"count":40,"rest_id":"12","includeTweetVisibilityNudge":true,"autoplay_enabled":true}&features={"longform_notetweets_inline_media_enabled":true,"super_follow_badge_privacy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"super_follow_user_api_enabled":true,"unified_cards_ad_metadata_container_dynamic_card_content_query_enabled":true,"super_follow_tweet_api_enabled":true,"articles_api_enabled":true,"android_graphql_skip_api_media_color_palette":true,"creator_subscriptions_tweet_preview_api_enabled":true,"freedom_of_speech_not_reach_fetch_enabled":true,"tweetypie_unmention_optimization_enabled":true,"longform_notetweets_consumption_enabled":true,"subscriptions_verification_info_enabled":true,"blue_business_profile_image_shape_enabled":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"immersive_video_status_linkable_timestamps":false,"super_follow_exclusive_tweet_notifications_enabled":true}
|
||||
continue
|
||||
try:
|
||||
vars = json.loads('{"userId":"x","count":20,"includePromotedContent":true,"withQuickPromoteEligibilityTweetFields":true,"withVoice":true,"withV2Timeline":true}')
|
||||
vars = json.loads('{"userId":"0","count":20,"includePromotedContent":true,"withQuickPromoteEligibilityTweetFields":true,"withVoice":true}')
|
||||
vars['userId'] = str(userId)
|
||||
vars['includePromotedContent'] = False # idk if this works
|
||||
reqHeaders = getAuthHeaders(bearer,authToken=authToken)
|
||||
reqHeaders["x-client-transaction-id"] = twUtils.generate_transaction_id("GET","/i/api/graphql/x31u1gdnjcqtiVZFc1zWnQ/UserWithProfileTweetsQueryV2")
|
||||
feed = requests.get(f"https://{twitterUrl}/i/api/graphql/{tweetFeedGraphql_api}/UserTweets?variables={urllib.parse.quote(json.dumps(vars))}&features={urllib.parse.quote(tweetFeedGraphqlFeatures)}", reqHeaders)
|
||||
reqHeaders = getAuthHeaders(v2bearer,guestToken=getGuestToken())
|
||||
endpoint=f"/i/api/graphql/{tweetFeedGraphql_api}/UserTweets"
|
||||
reqHeaders["x-client-transaction-id"] = twUtils.generate_transaction_id("GET",endpoint)
|
||||
feed = requests.get(f"https://{twitterUrl}{endpoint}", {'variables':json.dumps(vars),'features':tweetFeedGraphqlFeatures,'fieldToggles':'{"withArticlePlainText":false}'},headers=reqHeaders)
|
||||
if feed.status_code == 403:
|
||||
raise TwExtractError(403, "Extract error")
|
||||
output = feed.json()
|
||||
if "errors" in output:
|
||||
# pick the first error and create a twExtractError
|
||||
error = output["errors"][0]
|
||||
raise TwExtractError(error["code"], error["message"])
|
||||
return output
|
||||
timelineInstructions = output['data']['user']['result']['timeline']['timeline']['instructions']
|
||||
#tweetIds=None
|
||||
tweets=None
|
||||
for instruction in timelineInstructions:
|
||||
if 'type' in instruction and instruction['type'] == 'TimelineAddEntries':
|
||||
entries = instruction['entries']
|
||||
#tweetIds = []
|
||||
tweets = []
|
||||
for entry in entries:
|
||||
if entry['entryId'].startswith("tweet-"):
|
||||
# get the tweet ID from the entryId
|
||||
#tweetId = entry['entryId'].split("-")[1]
|
||||
#tweetIds.append(tweetId)
|
||||
tweet = entry['content']['itemContent']['tweet_results']['result']
|
||||
tweets.append(tweet)
|
||||
return tweets
|
||||
except Exception as e:
|
||||
print(f"Exception in extractUserFeedFromId: {str(e)}")
|
||||
continue
|
||||
raise TwExtractError(400, "Extract error")
|
||||
|
||||
def extractUserFeed(username,workaroundTokens):
|
||||
pass
|
||||
|
||||
def lambda_handler(event, context):
|
||||
if ("queryStringParameters" not in event):
|
||||
return {
|
||||
|
@ -2,6 +2,9 @@ import math
|
||||
import hashlib
|
||||
import base64
|
||||
import uuid
|
||||
from x_client_transaction import ClientTransaction
|
||||
from x_client_transaction.utils import handle_x_migration
|
||||
import requests
|
||||
digits = "0123456789abcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
def baseConversion(x, base):
|
||||
@ -31,5 +34,21 @@ def calcSyndicationToken(idStr):
|
||||
c = '0'
|
||||
return c
|
||||
|
||||
def generate_transaction_id(method: str, path: str) -> str:
|
||||
return "?" # not implemented
|
||||
def get_twitter_homepage(headers=None):
|
||||
if headers is None:
|
||||
headers = {"Authority": "x.com",
|
||||
"Accept-Language": "en-US,en;q=0.9",
|
||||
"Cache-Control": "no-cache",
|
||||
"Referer": "https://x.com",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
|
||||
"X-Twitter-Active-User": "yes",
|
||||
"X-Twitter-Client-Language": "en"}
|
||||
if 'Authorization' in headers:
|
||||
del headers['Authorization']
|
||||
response = requests.get("https://x.com/home", headers=headers)
|
||||
return response
|
||||
|
||||
def generate_transaction_id(method: str, path: str,headers=None) -> str:
|
||||
ct = ClientTransaction(get_twitter_homepage(headers=headers))
|
||||
transaction_id = ct.generate_transaction_id(method=method, path=path)
|
||||
return transaction_id
|
259
twitfix.py
259
twitfix.py
@ -1,5 +1,4 @@
|
||||
from weakref import finalize
|
||||
from flask import Flask, render_template, request, redirect, abort, Response, send_from_directory, url_for, send_file, make_response, jsonify
|
||||
from flask import Flask, render_template, request, redirect, abort, Response, send_from_directory, send_file
|
||||
|
||||
from configHandler import config
|
||||
remoteCombine='combination_method' in config['config'] and config['config']['combination_method'] != "local"
|
||||
@ -15,14 +14,16 @@ import msgs
|
||||
import twExtract as twExtract
|
||||
from cache import addVnfToLinkCache,getVnfFromLinkCache
|
||||
import vxlogging as log
|
||||
from utils import getTweetIdFromUrl, pathregex
|
||||
from utils import getTweetIdFromUrl, pathregex, determineMediaToEmbed, determineEmbedTweet, BytesIOWrapper, fixMedia
|
||||
from vxApi import getApiResponse, getApiUserResponse
|
||||
from urllib.parse import urlparse
|
||||
from PyRTF.Elements import Document
|
||||
from PyRTF.document.section import Section
|
||||
from PyRTF.document.paragraph import Paragraph
|
||||
from utils import BytesIOWrapper
|
||||
from copy import deepcopy
|
||||
import json
|
||||
import datetime
|
||||
import activity as activitymg
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
user_agent=""
|
||||
@ -34,6 +35,10 @@ staticFiles = { # TODO: Use flask static files instead of this
|
||||
"preferences": {"mime": "text/html","path": "preferences.html"},
|
||||
"style.css": {"mime": "text/css","path": "style.css"},
|
||||
"Roboto-Regular.ttf": {"mime": "font/ttf","path": "Roboto-Regular.ttf"},
|
||||
"gif.png": {"mime": "image/png","path": "richEmbed/gif.png"},
|
||||
"video.png": {"mime": "image/png","path": "richEmbed/video.png"},
|
||||
"image.png": {"mime": "image/png","path": "richEmbed/image.png"},
|
||||
"text.png": {"mime": "image/png","path": "richEmbed/text.png"},
|
||||
}
|
||||
|
||||
generate_embed_user_agents = [
|
||||
@ -61,13 +66,6 @@ def isValidUserAgent(user_agent):
|
||||
return True
|
||||
return False
|
||||
|
||||
def fixMedia(mediaInfo):
|
||||
# This is for the iOS Discord app, which has issues when serving URLs ending in .mp4 (https://github.com/dylanpdx/BetterTwitFix/issues/210)
|
||||
if 'video.twimg.com' not in mediaInfo['url'] or 'convert?url=' in mediaInfo['url']:
|
||||
return mediaInfo
|
||||
mediaInfo['url'] = mediaInfo['url'].replace("https://video.twimg.com",f"{config['config']['url']}/tvid").replace(".mp4","")
|
||||
return mediaInfo
|
||||
|
||||
def message(text):
|
||||
return render_template(
|
||||
'default.html',
|
||||
@ -77,7 +75,24 @@ def message(text):
|
||||
repo = config['config']['repo'],
|
||||
url = config['config']['url'] )
|
||||
|
||||
def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
|
||||
def generateActivityLink(tweetData,media=None,mediatype=None,embedIndex=-1):
|
||||
global user_agent
|
||||
if 'LegacyEmbed' in user_agent: # TODO: Clean up; This is a hacky fix to make the new activity embed not trigger
|
||||
return None
|
||||
try:
|
||||
embedIndex = embedIndex+1
|
||||
return f"{config['config']['url']}/users/{tweetData['user_screen_name']}/statuses/{str(embedIndex)}{tweetData['tweetID']}"
|
||||
except Exception as e:
|
||||
log.error("Error generating activity link: "+str(e))
|
||||
return None
|
||||
|
||||
def getAppName(tweetData,appnameSuffix=""):
|
||||
appName = config['config']['appname']+appnameSuffix
|
||||
if 'Discord' not in user_agent:
|
||||
appName = msgs.formatProvider(config['config']['appname']+appnameSuffix,tweetData)
|
||||
return appName
|
||||
|
||||
def renderImageTweetEmbed(tweetData,image,appnameSuffix="",embedIndex=-1):
|
||||
qrt = tweetData['qrt']
|
||||
embedDesc = msgs.formatEmbedDesc("Image",tweetData['text'],qrt,tweetData['pollData'])
|
||||
|
||||
@ -91,15 +106,22 @@ def renderImageTweetEmbed(tweetData,image,appnameSuffix=""):
|
||||
desc=embedDesc,
|
||||
urlEncodedDesc=urllib.parse.quote(embedDesc),
|
||||
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
|
||||
appname=msgs.formatProvider(config['config']['appname']+appnameSuffix,tweetData),
|
||||
color=config['config']['color']
|
||||
appname=getAppName(tweetData,appnameSuffix),
|
||||
color=config['config']['color'],
|
||||
sicon="image",
|
||||
activityLink=generateActivityLink(tweetData,image,"image/png",embedIndex)
|
||||
)
|
||||
|
||||
def renderVideoTweetEmbed(tweetData,mediaInfo,appnameSuffix=""):
|
||||
def renderVideoTweetEmbed(tweetData,mediaInfo,appnameSuffix="",embedIndex=-1):
|
||||
qrt = tweetData['qrt']
|
||||
embedDesc = msgs.formatEmbedDesc("Video",tweetData['text'],qrt,tweetData['pollData'])
|
||||
|
||||
mediaInfo=fixMedia(mediaInfo)
|
||||
|
||||
appName = config['config']['appname']+appnameSuffix
|
||||
if 'Discord' not in user_agent:
|
||||
appName = msgs.formatProvider(config['config']['appname']+appnameSuffix,tweetData)
|
||||
|
||||
return render_template("video.html",
|
||||
tweet=tweetData,
|
||||
media=mediaInfo,
|
||||
@ -107,26 +129,32 @@ def renderVideoTweetEmbed(tweetData,mediaInfo,appnameSuffix=""):
|
||||
desc=embedDesc,
|
||||
urlEncodedDesc=urllib.parse.quote(embedDesc),
|
||||
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
|
||||
appname=msgs.formatProvider(config['config']['appname']+appnameSuffix,tweetData),
|
||||
color=config['config']['color']
|
||||
appname=appName,
|
||||
color=config['config']['color'],
|
||||
sicon="video",
|
||||
activityLink=generateActivityLink(tweetData,mediaInfo['url'],"video/mp4",embedIndex)
|
||||
)
|
||||
|
||||
def renderTextTweetEmbed(tweetData,appnameSuffix=""):
|
||||
qrt = tweetData['qrt']
|
||||
embedDesc = msgs.formatEmbedDesc("Text",tweetData['text'],qrt,tweetData['pollData'])
|
||||
|
||||
return render_template("text.html",
|
||||
tweet=tweetData,
|
||||
host=config['config']['url'],
|
||||
desc=embedDesc,
|
||||
urlEncodedDesc=urllib.parse.quote(embedDesc),
|
||||
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
|
||||
appname=msgs.formatProvider(config['config']['appname']+appnameSuffix,tweetData),
|
||||
color=config['config']['color']
|
||||
appname=getAppName(tweetData,appnameSuffix),
|
||||
color=config['config']['color'],
|
||||
activityLink=generateActivityLink(tweetData),
|
||||
sicon="text"
|
||||
)
|
||||
|
||||
def renderArticleTweetEmbed(tweetData,appnameSuffix=""):
|
||||
articlePreview=tweetData['article']["title"]+"\n\n"+tweetData['article']["preview_text"]+"…"
|
||||
embedDesc = msgs.formatEmbedDesc("Image",articlePreview,None,None)
|
||||
|
||||
return render_template("image.html",
|
||||
tweet=tweetData,
|
||||
pic=[tweetData['article']["image"]],
|
||||
@ -134,8 +162,9 @@ def renderArticleTweetEmbed(tweetData,appnameSuffix=""):
|
||||
desc=embedDesc,
|
||||
urlEncodedDesc=urllib.parse.quote(embedDesc),
|
||||
tweetLink=f'https://twitter.com/{tweetData["user_screen_name"]}/status/{tweetData["tweetID"]}',
|
||||
appname=msgs.formatProvider(config['config']['appname']+appnameSuffix,tweetData),
|
||||
color=config['config']['color']
|
||||
appname=getAppName(tweetData,appnameSuffix),
|
||||
color=config['config']['color'],
|
||||
sicon="image"
|
||||
)
|
||||
|
||||
def renderUserEmbed(userData,appnameSuffix=""):
|
||||
@ -166,6 +195,70 @@ def oembedend():
|
||||
provName = request.args.get("provider",None)
|
||||
return oEmbedGen(desc, user, link, ttype,providerName=provName)
|
||||
|
||||
@app.route('/activity.json')
|
||||
def activity():
|
||||
tweetId = request.args.get("id", None)
|
||||
publishedDate = request.args.get("published", None)
|
||||
likes = request.args.get("likes", None)
|
||||
retweets = request.args.get("retweets", None)
|
||||
userAttrTo = request.args.get("user", None)
|
||||
content = request.args.get("content", None)
|
||||
attachments = json.loads(request.args.get("attachments", "[]"))
|
||||
|
||||
##
|
||||
|
||||
attachmentsRaw = []
|
||||
for attachment in attachments:
|
||||
attachmentsRaw.append({
|
||||
"type": "Document",
|
||||
"mediaType": attachment["type"],
|
||||
"url": attachment["url"],
|
||||
"preview_url": "https://pbs.twimg.com/ext_tw_video_thumb/1906073839441735680/pu/img/2xqg6tlK9mK0mSOR.jpg",
|
||||
})
|
||||
|
||||
return {
|
||||
"id": "https://x.com/i/status/"+tweetId,
|
||||
"type": "Note",
|
||||
"summary": None,
|
||||
"inReplyTo": None,
|
||||
"published": publishedDate,
|
||||
"url": "https://x.com/i/status/"+tweetId,
|
||||
"attributedTo": userAttrTo,
|
||||
"content": content,
|
||||
"attachment": attachmentsRaw,
|
||||
"likes": {
|
||||
"type": "Collection",
|
||||
"totalItems": int(likes)
|
||||
},
|
||||
"shares": {
|
||||
"type": "Collection",
|
||||
"totalItems": int(retweets)
|
||||
},
|
||||
}
|
||||
|
||||
@app.route('/user.json')
|
||||
def userJson():
|
||||
|
||||
screen_name = request.args.get("screen_name", None)
|
||||
name = request.args.get("name", None)
|
||||
pfp = request.args.get("pfp", None)
|
||||
|
||||
return {
|
||||
"id": screen_name,
|
||||
"type": "Person",
|
||||
"preferredUsername": screen_name,
|
||||
"name": name,
|
||||
"summary": "",
|
||||
"url": "https://x.com/"+screen_name,
|
||||
"tag": [],
|
||||
"attachment": [],
|
||||
"icon": {
|
||||
"type": "Image",
|
||||
"mediaType": "image/jpeg",
|
||||
"url": pfp
|
||||
},
|
||||
}
|
||||
|
||||
def getTweetData(twitter_url,include_txt="false",include_rtf="false"):
|
||||
cachedVNF = getVnfFromLinkCache(twitter_url)
|
||||
if cachedVNF is not None and include_txt == "false" and include_rtf == "false":
|
||||
@ -197,27 +290,32 @@ def getTweetData(twitter_url,include_txt="false",include_rtf="false"):
|
||||
addVnfToLinkCache(twitter_url,tweetData)
|
||||
return tweetData
|
||||
|
||||
def getUserData(twitter_url):
|
||||
def getUserData(twitter_url,includeFeed=False):
|
||||
rawUserData = twExtract.extractUser(twitter_url,workaroundTokens=config['config']['workaroundTokens'].split(','))
|
||||
userData = getApiUserResponse(rawUserData)
|
||||
return userData
|
||||
|
||||
def determineEmbedTweet(tweetData):
|
||||
# Determine which tweet, i.e main or QRT, to embed the media from.
|
||||
# if there is no QRT, return the main tweet => default behavior
|
||||
# if both don't have media, return the main tweet => embedding qrt text will be handled in the embed description
|
||||
# if both have media, return the main tweet => priority is given to the main tweet's media
|
||||
# if only the QRT has media, return the QRT => show the QRT's media, not the main tweet's
|
||||
# if only the main tweet has media, return the main tweet => show the main tweet's media, embedding QRT text will be handled in the embed description
|
||||
if tweetData['qrt'] is None:
|
||||
return tweetData
|
||||
if tweetData['qrt']['hasMedia'] and not tweetData['hasMedia']:
|
||||
return tweetData['qrt']
|
||||
return tweetData
|
||||
if includeFeed:
|
||||
if userData['protected']:
|
||||
userData['latest_tweets']=[]
|
||||
else:
|
||||
feed = twExtract.extractUserFeedFromId(userData['id'],workaroundTokens=config['config']['workaroundTokens'].split(','))
|
||||
apiFeed = []
|
||||
for tweet in feed:
|
||||
apiFeed.append(getApiResponse(tweet))
|
||||
userData['latest_tweets'] = apiFeed
|
||||
|
||||
return userData
|
||||
|
||||
@app.route('/<path:sub_path>') # Default endpoint used by everything
|
||||
def twitfix(sub_path):
|
||||
global user_agent
|
||||
user_agent = request.headers.get('User-Agent', None)
|
||||
if user_agent is None:
|
||||
user_agent = "unknown"
|
||||
|
||||
isApiRequest=request.url.startswith("https://api.vx") or request.url.startswith("http://api.vx")
|
||||
if not isApiRequest and (request.url.startswith("https://l.vx") or request.url.startswith("https://old.vx")) and "Discord" in user_agent:
|
||||
user_agent = user_agent.replace("Discord","LegacyEmbed") # TODO: Clean up; This is a hacky fix to make the new activity embed not trigger
|
||||
if sub_path in staticFiles:
|
||||
if 'path' not in staticFiles[sub_path] or staticFiles[sub_path]["path"] == None:
|
||||
staticFiles[sub_path]["path"] = sub_path
|
||||
@ -235,7 +333,15 @@ def twitfix(sub_path):
|
||||
username=sub_path.split("/")[0]
|
||||
extra = sub_path.split("/")[1]
|
||||
if extra in [None,"with_replies","media","likes","highlights","superfollows","media",''] and username != "" and username != None:
|
||||
userData = getUserData(f"https://twitter.com/{username}")
|
||||
try:
|
||||
userData = getUserData(f"https://twitter.com/{username}","with_tweets" in request.args)
|
||||
except twExtract.TwExtractError as e:
|
||||
if isApiRequest:
|
||||
status=500
|
||||
if 'not found' in e.msg:
|
||||
status=404
|
||||
return Response(json.dumps({"error": e.msg}), status=status,mimetype='application/json')
|
||||
return message("Error getting user data: "+str(e.msg))
|
||||
if isApiRequest:
|
||||
if userData is None:
|
||||
abort(404)
|
||||
@ -264,6 +370,12 @@ def twitfix(sub_path):
|
||||
if 'qrtURL' in tweetData and tweetData['qrtURL'] is not None:
|
||||
qrt = getTweetData(tweetData['qrtURL'])
|
||||
tweetData['qrt'] = qrt
|
||||
|
||||
retweet = None
|
||||
if 'retweetURL' in tweetData and tweetData['retweetURL'] is not None:
|
||||
retweet = getTweetData(tweetData['retweetURL'])
|
||||
tweetData['retweet'] = retweet
|
||||
|
||||
tweetData = deepcopy(tweetData)
|
||||
log.success("Tweet Data Get success")
|
||||
if '?' in request.url:
|
||||
@ -300,48 +412,36 @@ def twitfix(sub_path):
|
||||
if isApiRequest: # Directly return the API response if the request is from the API
|
||||
return tweetData
|
||||
elif directEmbed: # direct embed
|
||||
embeddingMedia = tweetData['hasMedia']
|
||||
renderMedia = None
|
||||
if embeddingMedia:
|
||||
renderMedia = determineMediaToEmbed(tweetData,embedIndex)
|
||||
# direct embeds should always prioritize the main tweet, so don't check for qrt
|
||||
# determine what type of media we're dealing with
|
||||
if not tweetData['hasMedia'] and qrt is None:
|
||||
if not embeddingMedia and qrt is None:
|
||||
return renderTextTweetEmbed(tweetData)
|
||||
elif tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
|
||||
return render_template("rawimage.html",media={"url":tweetData['combinedMediaUrl']})
|
||||
else:
|
||||
# this means we have mixed media or video, and we're only going to embed one
|
||||
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
|
||||
embedIndex = 0
|
||||
media = tweetData['media_extended'][embedIndex]
|
||||
media=fixMedia(media)
|
||||
if media['type'] == "image":
|
||||
return render_template("rawimage.html",media=media)
|
||||
elif media['type'] == "video" or media['type'] == "gif":
|
||||
return render_template("rawvideo.html",media=media)
|
||||
if renderMedia['type'] == "image":
|
||||
return render_template("rawimage.html",media=renderMedia)
|
||||
elif renderMedia['type'] == "video" or renderMedia['type'] == "gif":
|
||||
return render_template("rawvideo.html",media=renderMedia)
|
||||
else: # full embed
|
||||
embedTweetData = determineEmbedTweet(tweetData)
|
||||
embeddingMedia = embedTweetData['hasMedia']
|
||||
|
||||
if "article" in embedTweetData and embedTweetData["article"] is not None:
|
||||
return renderArticleTweetEmbed(tweetData," - See original tweet for full article")
|
||||
elif not embedTweetData['hasMedia']:
|
||||
return renderArticleTweetEmbed(tweetData," • See original tweet for full article")
|
||||
elif not embeddingMedia:
|
||||
return renderTextTweetEmbed(tweetData)
|
||||
elif embedTweetData['allSameType'] and embedTweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and embedTweetData['combinedMediaUrl'] != None:
|
||||
return renderImageTweetEmbed(tweetData,embedTweetData['combinedMediaUrl'],appnameSuffix=" - See original tweet for full quality")
|
||||
else:
|
||||
# this means we have mixed media or video, and we're only going to embed one
|
||||
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
|
||||
embedIndex = 0
|
||||
media = embedTweetData['media_extended'][embedIndex]
|
||||
if len(embedTweetData["media_extended"]) > 1:
|
||||
suffix = f' - Media {embedIndex+1}/{len(embedTweetData["media_extended"])}'
|
||||
else:
|
||||
suffix = ''
|
||||
media = determineMediaToEmbed(embedTweetData,embedIndex)
|
||||
suffix=""
|
||||
if "suffix" in media:
|
||||
suffix = media["suffix"]
|
||||
if media['type'] == "image":
|
||||
return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=suffix)
|
||||
return renderImageTweetEmbed(tweetData,media['url'] , appnameSuffix=suffix,embedIndex=embedIndex)
|
||||
elif media['type'] == "video" or media['type'] == "gif":
|
||||
if media['type'] == "gif":
|
||||
if config['config']['gifConvertAPI'] != "" and config['config']['gifConvertAPI'] != "none":
|
||||
vurl=media['originalUrl'] if 'originalUrl' in media else media['url']
|
||||
media['url'] = config['config']['gifConvertAPI'] + "/convert?url=" + vurl
|
||||
suffix += " - GIF"
|
||||
return renderVideoTweetEmbed(tweetData,media,appnameSuffix=suffix)
|
||||
return renderVideoTweetEmbed(tweetData,media,appnameSuffix=suffix,embedIndex=embedIndex)
|
||||
|
||||
return message(msgs.failedToScan)
|
||||
|
||||
@ -377,6 +477,29 @@ def rendercombined():
|
||||
imgIo.seek(0)
|
||||
return send_file(imgIo, mimetype='image/jpeg',max_age=86400)
|
||||
|
||||
@app.route("/api/v1/statuses/<string:tweet_id>")
|
||||
def api_v1_status(tweet_id):
|
||||
embedIndex = int(tweet_id[0])-1
|
||||
tweet_id = int(tweet_id[1:])
|
||||
twitter_url=f"https://twitter.com/i/status/{tweet_id}"
|
||||
tweetData = getTweetData(twitter_url)
|
||||
if tweetData is None:
|
||||
log.error("Tweet Data Get failed for "+twitter_url)
|
||||
return message(msgs.failedToScan)
|
||||
qrt = None
|
||||
if 'qrtURL' in tweetData and tweetData['qrtURL'] is not None:
|
||||
qrt = getTweetData(tweetData['qrtURL'])
|
||||
tweetData['qrt'] = qrt
|
||||
|
||||
retweet = None
|
||||
if 'retweetURL' in tweetData and tweetData['retweetURL'] is not None:
|
||||
retweet = getTweetData(tweetData['retweetURL'])
|
||||
tweetData['retweet'] = retweet
|
||||
|
||||
if tweetData is None:
|
||||
abort(500) # this should cause Discord to fall back to the default embed
|
||||
return activitymg.tweetDataToActivity(tweetData,embedIndex)
|
||||
|
||||
def oEmbedGen(description, user, video_link, ttype,providerName=None):
|
||||
if providerName == None:
|
||||
providerName = config['config']['appname']
|
||||
@ -394,4 +517,4 @@ def oEmbedGen(description, user, video_link, ttype,providerName=None):
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.config['SERVER_NAME']='localhost:80'
|
||||
app.run(host='0.0.0.0')
|
||||
app.run(host='0.0.0.0', port=8080)
|
||||
|
@ -3,11 +3,11 @@ Description=Init file for twitfix uwsgi instance
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
User=dylan
|
||||
Group=dylan
|
||||
WorkingDirectory=/home/dylan/BetterTwitFix
|
||||
Environment="PATH=/home/dylan/BetterTwitFix/venv/bin"
|
||||
ExecStart=/home/dylan/BetterTwitFix/venv/bin/uwsgi --ini twitfix.ini
|
||||
User=carbonara
|
||||
Group=carbonara
|
||||
WorkingDirectory=/home/carbonara/twitter/BetterTwitFix
|
||||
Environment="PATH=/home/carbonara/twitter/BetterTwitFix/venv/bin"
|
||||
ExecStart=/home/carbonara/twitter/BetterTwitFix/venv/bin/uwsgi --ini twitfix.ini
|
||||
Restart=always
|
||||
RestartSec=3
|
||||
|
||||
|
48
utils.py
48
utils.py
@ -1,5 +1,6 @@
|
||||
import re
|
||||
import io
|
||||
from configHandler import config
|
||||
|
||||
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/(\\d{2,20})")
|
||||
endTCOregex = re.compile("(^.*?) +https:\/\/t.co\/.*?$")
|
||||
@ -41,3 +42,50 @@ class BytesIOWrapper(io.BufferedReader):
|
||||
|
||||
def peek(self, size=-1):
|
||||
return self._encoding_call('peek', size)
|
||||
|
||||
def fixMedia(mediaInfo):
|
||||
# This is for the iOS Discord app, which has issues when serving URLs ending in .mp4 (https://github.com/dylanpdx/BetterTwitFix/issues/210)
|
||||
if 'video.twimg.com' not in mediaInfo['url'] or 'convert?url=' in mediaInfo['url'] or 'originalUrl' in mediaInfo:
|
||||
return mediaInfo
|
||||
mediaInfo["originalUrl"] = mediaInfo['url']
|
||||
mediaInfo['url'] = mediaInfo['url'].replace("https://video.twimg.com",f"{config['config']['url']}/tvid").replace(".mp4","")
|
||||
return mediaInfo
|
||||
|
||||
def determineEmbedTweet(tweetData):
|
||||
# Determine which tweet, i.e main or QRT, to embed the media from.
|
||||
# if there is no QRT, return the main tweet => default behavior
|
||||
# if both don't have media, return the main tweet => embedding qrt text will be handled in the embed description
|
||||
# if both have media, return the main tweet => priority is given to the main tweet's media
|
||||
# if only the QRT has media, return the QRT => show the QRT's media, not the main tweet's
|
||||
# if only the main tweet has media, return the main tweet => show the main tweet's media, embedding QRT text will be handled in the embed description
|
||||
if tweetData['qrt'] is None:
|
||||
return tweetData
|
||||
if tweetData['qrt']['hasMedia'] and not tweetData['hasMedia']:
|
||||
return tweetData['qrt']
|
||||
return tweetData
|
||||
|
||||
def determineMediaToEmbed(tweetData,embedIndex = -1):
|
||||
if tweetData['allSameType'] and tweetData['media_extended'][0]['type'] == "image" and embedIndex == -1 and tweetData['combinedMediaUrl'] != None:
|
||||
return {"url":tweetData['combinedMediaUrl'],"type":"image"}
|
||||
else:
|
||||
# this means we have mixed media or video, and we're only going to embed one
|
||||
if embedIndex == -1: # if the user didn't specify an index, we'll just use the first one
|
||||
embedIndex = 0
|
||||
media = tweetData['media_extended'][embedIndex]
|
||||
media=fixMedia(media)
|
||||
suffix=""
|
||||
if len(tweetData["media_extended"]) > 1:
|
||||
suffix = f' • Media {embedIndex+1}/{len(tweetData["media_extended"])}'
|
||||
else:
|
||||
suffix = ''
|
||||
media["suffix"] = suffix
|
||||
if media['type'] == "image":
|
||||
return media
|
||||
elif media['type'] == "video" or media['type'] == "gif":
|
||||
if media['type'] == "gif":
|
||||
if config['config']['gifConvertAPI'] != "" and config['config']['gifConvertAPI'] != "none":
|
||||
vurl=media['originalUrl'] if 'originalUrl' in media else media['url']
|
||||
media['url'] = config['config']['gifConvertAPI'] + "/convert?url=" + vurl
|
||||
suffix += " • GIF"
|
||||
media["suffix"] = suffix
|
||||
return media
|
26
vxApi.py
26
vxApi.py
@ -16,6 +16,7 @@ def getApiUserResponse(user):
|
||||
"tweet_count": user["statuses_count"],
|
||||
"created_at": user["created_at"],
|
||||
"protected": user["protected"],
|
||||
"fetched_on": int(datetime.now().timestamp()),
|
||||
}
|
||||
|
||||
def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
@ -98,7 +99,7 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
if "hashtags" in tweetL["entities"]:
|
||||
for i in tweetL["entities"]["hashtags"]:
|
||||
hashtags.append(i["text"])
|
||||
elif "card" in tweet and tweet['card']['name'] == "player":
|
||||
elif "card" in tweet and 'name' in tweet['card'] and tweet['card']['name'] == "player":
|
||||
width = None
|
||||
height = None
|
||||
vidUrl = None
|
||||
@ -143,6 +144,10 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
if 'quoted_status_id_str' in tweetL:
|
||||
qrtURL = "https://twitter.com/i/status/" + tweetL['quoted_status_id_str']
|
||||
|
||||
retweetURL = None
|
||||
if 'retweeted_status_result' in tweetL:
|
||||
retweetURL = "https://twitter.com/i/status/" + tweetL['retweeted_status_result']['result']['rest_id']
|
||||
|
||||
if 'possibly_sensitive' not in tweetL:
|
||||
tweetL['possibly_sensitive'] = False
|
||||
|
||||
@ -155,6 +160,8 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
|
||||
if 'entities' in tweetL and 'urls' in tweetL['entities']:
|
||||
for eurl in tweetL['entities']['urls']:
|
||||
if 'expanded_url' not in eurl:
|
||||
continue
|
||||
if "/status/" in eurl["expanded_url"] and eurl["expanded_url"].startswith("https://twitter.com/"):
|
||||
twText = twText.replace(eurl["url"], "")
|
||||
else:
|
||||
@ -210,7 +217,7 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
totalVotes += option["votes"]
|
||||
pollData["options"].append(option)
|
||||
for i in pollData["options"]:
|
||||
i["percent"] = round((i["votes"]/totalVotes)*100,2)
|
||||
i["percent"] = round((i["votes"]/totalVotes)*100,2) if totalVotes > 0 else 0
|
||||
|
||||
if 'lang' in tweetL:
|
||||
lang = tweetL['lang']
|
||||
@ -219,6 +226,18 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
if 'in_reply_to_screen_name' in tweetL and tweetL['in_reply_to_screen_name'] != None:
|
||||
replyingTo = tweetL['in_reply_to_screen_name']
|
||||
|
||||
replyingToID = None
|
||||
if 'in_reply_to_status_id_str' in tweetL and tweetL['in_reply_to_status_id_str'] != None:
|
||||
replyingToID = tweetL['in_reply_to_status_id_str']
|
||||
|
||||
if 'screen_name' not in userL and 'core' in tweet["core"]["user_results"]["result"]:
|
||||
userL['screen_name'] = tweet["core"]["user_results"]["result"]["core"]["screen_name"]
|
||||
if 'name' not in userL:
|
||||
userL['name'] = tweet["core"]["user_results"]["result"]["core"]["name"]
|
||||
|
||||
if 'profile_image_url_https' not in userL and 'avatar' in tweet["core"]["user_results"]["result"]:
|
||||
userL['profile_image_url_https'] = tweet["core"]["user_results"]["result"]["avatar"]["image_url"]
|
||||
|
||||
apiObject = {
|
||||
"text": twText,
|
||||
"likes": tweetL["favorite_count"],
|
||||
@ -244,6 +263,9 @@ def getApiResponse(tweet,include_txt=False,include_rtf=False):
|
||||
"article": tweetArticle,
|
||||
"lang": lang,
|
||||
"replyingTo": replyingTo,
|
||||
"replyingToID": replyingToID,
|
||||
"fetched_on": int(datetime.now().timestamp()),
|
||||
"retweetURL":retweetURL,
|
||||
}
|
||||
try:
|
||||
apiObject["date_epoch"] = int(datetime.strptime(tweetL["created_at"], "%a %b %d %H:%M:%S %z %Y").timestamp())
|
||||
|
@ -12,6 +12,7 @@ testNSFWTweet="https://twitter.com/kuyacoy/status/1581185279376838657"
|
||||
testPollTweet="https://twitter.com/norm/status/651169346518056960"
|
||||
testMixedMediaTweet="https://twitter.com/bigbeerfest/status/1760638922084741177"
|
||||
testVinePlayerTweet="https://twitter.com/Roblox/status/583302104342638592"
|
||||
testRetweetTweet="https://twitter.com/pdxdylan/status/1828570470222045294"
|
||||
|
||||
testTextTweet_compare={'text': 'just setting up my twttr', 'date': 'Tue Mar 21 20:50:14 +0000 2006', 'tweetURL': 'https://twitter.com/jack/status/20', 'tweetID': '20', 'conversationID': '20', 'mediaURLs': [], 'media_extended': [], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': False, 'combinedMediaUrl': None, 'pollData': None, 'article': None, 'date_epoch': 1142974214}
|
||||
testVideoTweet_compare={'text': 'TikTok embeds on Discord/Telegram bait you with a fake play button, but to see the actual video you have to go to their website.\nAs a request from a friend, I made it so that if you add "vx" before "tiktok" on any link, it fixes that. https://t.co/QYpiVXUIrW', 'date': 'Fri Jun 24 18:17:31 +0000 2022', 'tweetURL': 'https://twitter.com/pdxdylan/status/1540398733669666818', 'tweetID': '1540398733669666818', 'conversationID': '1540398733669666818', 'mediaURLs': ['https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4'], 'media_extended': [{'url': 'https://video.twimg.com/ext_tw_video/1540396699037929472/pu/vid/762x528/YxbXbT3X7vq4LWfC.mp4', 'type': 'video', 'size': {'width': 762, 'height': 528}, 'duration_millis': 13650, 'thumbnail_url': 'https://pbs.twimg.com/ext_tw_video_thumb/1540396699037929472/pu/img/l187Z6B9AHHxUKPV.jpg', 'altText': None}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': None, 'article': None, 'date_epoch': 1656094651}
|
||||
@ -26,6 +27,9 @@ testMixedMediaTweet_compare={'text': 'Some of us here are definitely big nerds a
|
||||
testVinePlayerTweet_compare={'text': 'You wanted old ROBLOX back, you got it. Check out our sweet "new" look! #BringBackOldROBLOX https://vine.co/v/OL9VqvM6wJh', 'date': 'Wed Apr 01 16:17:13 +0000 2015', 'tweetURL': 'https://twitter.com/Roblox/status/583302104342638592', 'tweetID': '583302104342638592', 'conversationID': '583302104342638592', 'mediaURLs': ['https://v.cdn.vine.co/r/videos/20A1BE53011195086166081318912_3fe3b526b1a.1.5.3156516531034157495.mp4?versionId=DI1mMu7EI6zcLbvgucyp3GHebdz8.9cQ'], 'media_extended': [{'url': 'https://v.cdn.vine.co/r/videos/20A1BE53011195086166081318912_3fe3b526b1a.1.5.3156516531034157495.mp4?versionId=DI1mMu7EI6zcLbvgucyp3GHebdz8.9cQ', 'type': 'video', 'size': {'width': 435, 'height': 435}}], 'possibly_sensitive': False, 'hashtags': [], 'qrtURL': None, 'allSameType': True, 'hasMedia': True, 'combinedMediaUrl': None, 'pollData': {'options': []}, 'article': None, 'date_epoch': 1427905033}
|
||||
|
||||
testUser="https://twitter.com/jack"
|
||||
testUserSuspended="https://twitter.com/twitter"
|
||||
testUserPrivate="https://twitter.com/PrestigeIsKey"
|
||||
testUserInvalid="https://twitter.com/.a"
|
||||
testUserID=12 # could also be 170824883
|
||||
testUserIDUrl = "https://twitter.com/i/user/"+str(testUserID)
|
||||
testUserWeirdURLs=["https://twitter.com/jack?lang=en","https://twitter.com/jack/with_replies","https://twitter.com/jack/media","https://twitter.com/jack/likes","https://twitter.com/jack/with_replies?lang=en","https://twitter.com/jack/media?lang=en","https://twitter.com/jack/likes?lang=en","https://twitter.com/jack/"]
|
||||
|
Reference in New Issue
Block a user