210 lines
6.6 KiB
Python
210 lines
6.6 KiB
Python
import asyncio
|
|
import time
|
|
from telegram import Update
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
MessageHandler,
|
|
filters,
|
|
ContextTypes,
|
|
)
|
|
from telegram.request import HTTPXRequest
|
|
from dotenv import load_dotenv
|
|
from web_utils import LinkHandler
|
|
from file_utils import FileHandler
|
|
from download_utils.instagram_utils import InstagramHandler
|
|
from download_utils.tiktok_utils import fallback_download
|
|
from download_utils.youtube_utils import YouTubeHandler
|
|
from stats_utils import load_stats, save_stats, hash_id
|
|
import os
|
|
|
|
|
|
# Load environment variables
|
|
stats = load_stats() # Just to show off bot's usage B)
|
|
load_dotenv()
|
|
api_key = os.getenv("telegram_token")
|
|
admin_id = int(os.getenv("admin_id"))
|
|
MAX_CONCURRENT_JOBS = int(os.getenv("max_concurrent_jobs", "1"))
|
|
QUEUE_MAXSIZE = int(os.getenv("queue_maxsize", "30"))
|
|
PER_CHAT_COOLDOWN = int(os.getenv("per_chat_cooldown_sec", "10"))
|
|
|
|
# Parse allowed user IDs
|
|
allowed_users_str = os.getenv("allowed_user_ids", "")
|
|
ALLOWED_USER_IDS = set(
|
|
int(uid.strip()) for uid in allowed_users_str.split(",")
|
|
if uid.strip().isdigit()
|
|
) if allowed_users_str else set()
|
|
|
|
# Initialize handlers
|
|
instagram_handler = InstagramHandler()
|
|
youtube_handler = YouTubeHandler()
|
|
link_handler = LinkHandler()
|
|
file_handler = FileHandler()
|
|
|
|
download_queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)
|
|
worker_tasks = []
|
|
stats_lock = asyncio.Lock()
|
|
last_chat_request = {}
|
|
|
|
|
|
async def process_download_job(app, job):
|
|
match = job["match"]
|
|
chat_id = job["chat_id"]
|
|
message_id = job["message_id"]
|
|
user_id = job["user_id"]
|
|
|
|
try:
|
|
shortcode = link_handler.extract_shortcode(match)
|
|
if not shortcode:
|
|
raise ValueError("Shortcode could not be extracted")
|
|
|
|
if "tiktok.com" in match:
|
|
tiktok_type = await asyncio.to_thread(link_handler.extract_tiktok_type, match)
|
|
await asyncio.to_thread(
|
|
fallback_download, match, shortcode, tiktok_type
|
|
)
|
|
elif "instagram.com" in match:
|
|
await asyncio.to_thread(instagram_handler.download_post, shortcode)
|
|
elif "youtube.com" in match or "youtu.be" in match:
|
|
await asyncio.to_thread(youtube_handler.download_video, shortcode)
|
|
|
|
media = file_handler.get_files(shortcode)
|
|
if media:
|
|
await app.bot.send_media_group(
|
|
chat_id=chat_id, media=media, reply_to_message_id=message_id
|
|
)
|
|
try:
|
|
file_handler.delete_files(shortcode)
|
|
except Exception as e:
|
|
await send_error_message(app.bot, [match], f"Cleanup failed: {e}")
|
|
else:
|
|
await app.bot.send_message(
|
|
chat_id=chat_id,
|
|
text="Something went wrong while retrieving media for this link.",
|
|
reply_to_message_id=message_id,
|
|
)
|
|
await send_error_message(app.bot, [match], "No media files found.")
|
|
|
|
async with stats_lock:
|
|
user_hash = hash_id(user_id)
|
|
chat_hash = hash_id(chat_id)
|
|
if user_hash not in stats["unique_users"]:
|
|
stats["unique_users"].add(user_hash)
|
|
if chat_hash not in stats["unique_chats"]:
|
|
stats["unique_chats"].add(chat_hash)
|
|
stats["total_links"] += 1
|
|
save_stats(stats)
|
|
except Exception as e:
|
|
print(f"Error processing {match}: {e}")
|
|
await app.bot.send_message(
|
|
chat_id=chat_id,
|
|
text="Something went wrong while processing the link.",
|
|
reply_to_message_id=message_id,
|
|
)
|
|
await send_error_message(app.bot, [match], str(e))
|
|
|
|
|
|
async def worker(app):
|
|
while True:
|
|
job = await download_queue.get()
|
|
try:
|
|
await process_download_job(app, job)
|
|
finally:
|
|
download_queue.task_done()
|
|
|
|
|
|
async def handle_links(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not update.message or not update.message.text:
|
|
return
|
|
|
|
# Check if user is allowed
|
|
user_id = update.message.from_user.id
|
|
if ALLOWED_USER_IDS and user_id not in ALLOWED_USER_IDS:
|
|
await update.message.reply_text(
|
|
"شما اجازه استفاده از این بات را ندارید."
|
|
)
|
|
print(f"Unauthorized user {user_id} tried to use the bot")
|
|
return
|
|
|
|
text = update.message.text
|
|
print(f"Received message: {text}")
|
|
matches = link_handler.link_pattern.findall(text)
|
|
print(f"Found matches: {matches}")
|
|
|
|
chat_id = update.message.chat_id
|
|
now = time.monotonic()
|
|
last_seen = last_chat_request.get(chat_id)
|
|
if last_seen and (now - last_seen) < PER_CHAT_COOLDOWN:
|
|
wait_for = int(PER_CHAT_COOLDOWN - (now - last_seen))
|
|
await update.message.reply_text(
|
|
f"بات در حال کار است، لطفا {wait_for} ثانیه دیگر دوباره امتحان کنید."
|
|
)
|
|
return
|
|
last_chat_request[chat_id] = now
|
|
|
|
for match in matches:
|
|
if download_queue.full():
|
|
await update.message.reply_text(
|
|
"The bot is busy right now. Please try again in a few minutes."
|
|
)
|
|
continue
|
|
|
|
job = {
|
|
"match": match,
|
|
"chat_id": chat_id,
|
|
"message_id": update.message.message_id,
|
|
"user_id": update.message.from_user.id,
|
|
}
|
|
|
|
await download_queue.put(job)
|
|
queue_position = download_queue.qsize()
|
|
await update.message.reply_text(
|
|
f"در صف دانلود هستید. جایگاه شما: {queue_position}."
|
|
)
|
|
|
|
|
|
# Function to send error messages to the admin
|
|
async def send_error_message(bot, matches, error_msg=""):
|
|
error_message = (
|
|
"The following links could not be processed:\n"
|
|
+ "\n".join(matches)
|
|
+ "\n"
|
|
+ error_msg
|
|
)
|
|
await bot.send_message(chat_id=admin_id, text=error_message)
|
|
|
|
|
|
async def on_startup(app):
|
|
for _ in range(MAX_CONCURRENT_JOBS):
|
|
task = asyncio.create_task(worker(app))
|
|
worker_tasks.append(task)
|
|
|
|
|
|
async def on_shutdown(app):
|
|
for task in worker_tasks:
|
|
task.cancel()
|
|
await asyncio.gather(*worker_tasks, return_exceptions=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Bot setup and start polling
|
|
request = HTTPXRequest(
|
|
connect_timeout=60.0,
|
|
read_timeout=60.0,
|
|
write_timeout=60.0,
|
|
pool_timeout=10.0,
|
|
media_write_timeout=60.0,
|
|
)
|
|
|
|
app = (
|
|
ApplicationBuilder()
|
|
.token(api_key)
|
|
.request(request)
|
|
.post_init(on_startup)
|
|
.post_shutdown(on_shutdown)
|
|
.build()
|
|
)
|
|
|
|
app.add_handler(MessageHandler(filters.ALL, handle_links))
|
|
|
|
app.run_polling()
|