Files
2026-01-04 20:57:34 +00:00

210 lines
6.6 KiB
Python

import asyncio
import time
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
MessageHandler,
filters,
ContextTypes,
)
from telegram.request import HTTPXRequest
from dotenv import load_dotenv
from web_utils import LinkHandler
from file_utils import FileHandler
from download_utils.instagram_utils import InstagramHandler
from download_utils.tiktok_utils import fallback_download
from download_utils.youtube_utils import YouTubeHandler
from stats_utils import load_stats, save_stats, hash_id
import os
# Load environment variables
stats = load_stats() # Just to show off bot's usage B)
load_dotenv()
api_key = os.getenv("telegram_token")
admin_id = int(os.getenv("admin_id"))
MAX_CONCURRENT_JOBS = int(os.getenv("max_concurrent_jobs", "1"))
QUEUE_MAXSIZE = int(os.getenv("queue_maxsize", "30"))
PER_CHAT_COOLDOWN = int(os.getenv("per_chat_cooldown_sec", "10"))
# Parse allowed user IDs
allowed_users_str = os.getenv("allowed_user_ids", "")
ALLOWED_USER_IDS = set(
int(uid.strip()) for uid in allowed_users_str.split(",")
if uid.strip().isdigit()
) if allowed_users_str else set()
# Initialize handlers
instagram_handler = InstagramHandler()
youtube_handler = YouTubeHandler()
link_handler = LinkHandler()
file_handler = FileHandler()
download_queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)
worker_tasks = []
stats_lock = asyncio.Lock()
last_chat_request = {}
async def process_download_job(app, job):
match = job["match"]
chat_id = job["chat_id"]
message_id = job["message_id"]
user_id = job["user_id"]
try:
shortcode = link_handler.extract_shortcode(match)
if not shortcode:
raise ValueError("Shortcode could not be extracted")
if "tiktok.com" in match:
tiktok_type = await asyncio.to_thread(link_handler.extract_tiktok_type, match)
await asyncio.to_thread(
fallback_download, match, shortcode, tiktok_type
)
elif "instagram.com" in match:
await asyncio.to_thread(instagram_handler.download_post, shortcode)
elif "youtube.com" in match or "youtu.be" in match:
await asyncio.to_thread(youtube_handler.download_video, shortcode)
media = file_handler.get_files(shortcode)
if media:
await app.bot.send_media_group(
chat_id=chat_id, media=media, reply_to_message_id=message_id
)
try:
file_handler.delete_files(shortcode)
except Exception as e:
await send_error_message(app.bot, [match], f"Cleanup failed: {e}")
else:
await app.bot.send_message(
chat_id=chat_id,
text="Something went wrong while retrieving media for this link.",
reply_to_message_id=message_id,
)
await send_error_message(app.bot, [match], "No media files found.")
async with stats_lock:
user_hash = hash_id(user_id)
chat_hash = hash_id(chat_id)
if user_hash not in stats["unique_users"]:
stats["unique_users"].add(user_hash)
if chat_hash not in stats["unique_chats"]:
stats["unique_chats"].add(chat_hash)
stats["total_links"] += 1
save_stats(stats)
except Exception as e:
print(f"Error processing {match}: {e}")
await app.bot.send_message(
chat_id=chat_id,
text="Something went wrong while processing the link.",
reply_to_message_id=message_id,
)
await send_error_message(app.bot, [match], str(e))
async def worker(app):
while True:
job = await download_queue.get()
try:
await process_download_job(app, job)
finally:
download_queue.task_done()
async def handle_links(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text:
return
# Check if user is allowed
user_id = update.message.from_user.id
if ALLOWED_USER_IDS and user_id not in ALLOWED_USER_IDS:
await update.message.reply_text(
"شما اجازه استفاده از این بات را ندارید."
)
print(f"Unauthorized user {user_id} tried to use the bot")
return
text = update.message.text
print(f"Received message: {text}")
matches = link_handler.link_pattern.findall(text)
print(f"Found matches: {matches}")
chat_id = update.message.chat_id
now = time.monotonic()
last_seen = last_chat_request.get(chat_id)
if last_seen and (now - last_seen) < PER_CHAT_COOLDOWN:
wait_for = int(PER_CHAT_COOLDOWN - (now - last_seen))
await update.message.reply_text(
f"بات در حال کار است، لطفا {wait_for} ثانیه دیگر دوباره امتحان کنید."
)
return
last_chat_request[chat_id] = now
for match in matches:
if download_queue.full():
await update.message.reply_text(
"The bot is busy right now. Please try again in a few minutes."
)
continue
job = {
"match": match,
"chat_id": chat_id,
"message_id": update.message.message_id,
"user_id": update.message.from_user.id,
}
await download_queue.put(job)
queue_position = download_queue.qsize()
await update.message.reply_text(
f"در صف دانلود هستید. جایگاه شما: {queue_position}."
)
# Function to send error messages to the admin
async def send_error_message(bot, matches, error_msg=""):
error_message = (
"The following links could not be processed:\n"
+ "\n".join(matches)
+ "\n"
+ error_msg
)
await bot.send_message(chat_id=admin_id, text=error_message)
async def on_startup(app):
for _ in range(MAX_CONCURRENT_JOBS):
task = asyncio.create_task(worker(app))
worker_tasks.append(task)
async def on_shutdown(app):
for task in worker_tasks:
task.cancel()
await asyncio.gather(*worker_tasks, return_exceptions=True)
if __name__ == "__main__":
# Bot setup and start polling
request = HTTPXRequest(
connect_timeout=60.0,
read_timeout=60.0,
write_timeout=60.0,
pool_timeout=10.0,
media_write_timeout=60.0,
)
app = (
ApplicationBuilder()
.token(api_key)
.request(request)
.post_init(on_startup)
.post_shutdown(on_shutdown)
.build()
)
app.add_handler(MessageHandler(filters.ALL, handle_links))
app.run_polling()