mirror of
https://github.com/alterware/aw-bot.git
synced 2025-10-26 14:15:54 +00:00
feat: detect sales!
This commit is contained in:
76
bot/tasks.py
76
bot/tasks.py
@@ -1,13 +1,85 @@
|
||||
from datetime import datetime, timezone
|
||||
import requests
|
||||
|
||||
import discord
|
||||
from discord.ext import tasks
|
||||
from discord.ext import tasks, commands
|
||||
|
||||
from bot.utils import aware_utcnow, fetch_api_data
|
||||
|
||||
TARGET_DATE = datetime(2036, 8, 12, tzinfo=timezone.utc)
|
||||
OFFTOPIC_CHANNEL = 1112048063448617142
|
||||
|
||||
COD_GAMES = {
|
||||
10180: {"name": "Modern Warfare 2 (2009)", "channel": 1145458108190163014},
|
||||
42680: {"name": "Modern Warfare 3 (2011)", "channel": 1145459504436220014},
|
||||
209160: {"name": "Call of Duty: Ghosts", "channel": 1145469106133401682},
|
||||
209650: {"name": "Call of Duty: Advanced Warfare", "channel": 1145469136919613551},
|
||||
311210: {"name": "Call of Duty: Black Ops 3", "channel": 1180796251529293844},
|
||||
}
|
||||
|
||||
|
||||
class SteamSaleChecker(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.check_steam_sale.start() # Start the task when the cog is loaded
|
||||
|
||||
def cog_unload(self):
|
||||
self.check_steam_sale.cancel() # Stop the task when the cog is unloaded
|
||||
|
||||
@tasks.loop(hours=24)
|
||||
async def check_steam_sale(self):
|
||||
|
||||
for app_id, game_data in COD_GAMES.items():
|
||||
game_name = game_data["name"]
|
||||
channel_id = game_data["channel"]
|
||||
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
if channel is None:
|
||||
print(f"Error: Channel ID {channel_id} for {game_name} not found.")
|
||||
return
|
||||
|
||||
steam_api_url = (
|
||||
f"https://store.steampowered.com/api/appdetails?appids={app_id}"
|
||||
)
|
||||
|
||||
try:
|
||||
response = requests.get(steam_api_url)
|
||||
data = response.json().get(str(app_id), {}).get("data", {})
|
||||
|
||||
if not data:
|
||||
print(f"Warning: No data returned for {game_name}. Skipping...")
|
||||
return
|
||||
|
||||
price_info = data.get("price_overview", {})
|
||||
|
||||
if not price_info:
|
||||
await channel.send(
|
||||
f"{game_name} is currently unavailable for purchase."
|
||||
)
|
||||
return
|
||||
|
||||
original_price = price_info.get("initial", 0) / 100
|
||||
discounted_price = price_info.get("final", 0) / 100
|
||||
discount_percent = price_info.get("discount_percent", 0)
|
||||
store_url = f"https://store.steampowered.com/app/{app_id}/"
|
||||
|
||||
if discount_percent > 0:
|
||||
message = (
|
||||
f"**{game_name} is on sale!**\n"
|
||||
f"Original Price: **${original_price:.2f}**\n"
|
||||
f"Discounted Price: **${discounted_price:.2f}** (**-{discount_percent}%**)\n"
|
||||
f"[View on Steam]({store_url})\n"
|
||||
)
|
||||
|
||||
await channel.send(message)
|
||||
|
||||
except requests.RequestException as e:
|
||||
print(f"Error fetching Steam sale data for {game_name}: {e}")
|
||||
|
||||
@check_steam_sale.before_loop
|
||||
async def before_check_steam_sale(self):
|
||||
await self.bot.wait_until_ready()
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
@tasks.loop(minutes=10)
|
||||
@@ -42,4 +114,6 @@ async def setup(bot):
|
||||
update_status.start()
|
||||
heat_death.start()
|
||||
|
||||
await bot.add_cog(SteamSaleChecker(bot))
|
||||
|
||||
print("Tasks extension loaded!")
|
||||
|
||||
Reference in New Issue
Block a user