This commit is contained in:
2026-01-04 20:57:34 +00:00
commit 637968fcb1
11 changed files with 877 additions and 0 deletions

59
README.md Normal file
View File

@@ -0,0 +1,59 @@
# Telegram Downloader Bot
A Telegram bot that detects Instagram and TikTok links in messages, downloads the associated media, and sends them back as albums (media groups) to the user. Includes error reporting and automatic cleanup of downloaded files.
---
## Features
- Supports downloading posts from Instagram and TikTok links.
- Sends downloaded media as Telegram media groups.
- Automatic cleanup of downloaded media files after sending.
---
## Setup
### Installation
1. Clone the repository:
```
git clone https://github.com/FriendlyOneDev/downloader-bot
cd downloader-bot
```
2. Create and activate a virtual environment:
```
python -m venv venv
source venv/bin/activate # Linux/macOS
venv\Scripts\activate # Windows
```
3. Install dependencies:
```
pip install -r requirements.txt
```
4. Create a .env file with your credentials:
```
telegram_token=YOUR_TELEGRAM_BOT_TOKEN
admin_id=YOUR_TELEGRAM_USER_ID #Bot needs it to send error messages directly to you. You can tweak the code to not require this
```
5. Run the bot
## Structure
```
.
├── bot_stats.json
├── download_utils
│   ├── instagram_utils.py
│   └── tiktok_utils.py
├── file_utils.py
├── main.py
├── README.md
├── requirements.txt
├── stats_utils.py
├── test.sh
├── test_web_utils.py
└── web_utils.py
```

View File

@@ -0,0 +1,100 @@
import os
from urllib.parse import urlparse
import instaloader
import requests
MAX_FILE_BYTES = int(os.getenv("max_file_bytes", str(100 * 1024 * 1024)))
class InstagramHandler:
def __init__(self):
self.instaloader = instaloader.Instaloader(
download_comments=False,
download_geotags=False,
download_video_thumbnails=False,
save_metadata=False,
post_metadata_txt_pattern="",
dirname_pattern=".",
)
# Requests session lets us reuse cookies (optional sessionid) and headers
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
" AppleWebKit/537.36 (KHTML, like Gecko)"
" Chrome/120.0.0.0 Safari/537.36",
}
)
# Optional authenticated session to avoid muted reels with licensed audio
sessionid = os.getenv("instagram_sessionid")
if sessionid:
# Domain must be set for Instagram CDN access
self.session.cookies.set("sessionid", sessionid, domain=".instagram.com")
# Reuse the same requests session inside instaloader context
self.instaloader.context._session = self.session
def _download_binary(self, url, dest_path):
response = self.session.get(url, stream=True, timeout=30)
response.raise_for_status()
content_length = response.headers.get("Content-Length")
if content_length and int(content_length) > MAX_FILE_BYTES:
raise ValueError("File exceeds size limit")
downloaded = 0
try:
with open(dest_path, "wb") as fh:
for chunk in response.iter_content(chunk_size=1024 * 256):
if not chunk:
continue
downloaded += len(chunk)
if downloaded > MAX_FILE_BYTES:
raise ValueError("File exceeds size limit")
fh.write(chunk)
except Exception:
if os.path.exists(dest_path):
os.remove(dest_path)
raise
def _pick_ext(self, url, fallback):
parsed = urlparse(url)
ext = os.path.splitext(parsed.path)[1]
if ext:
return ext
return fallback
def download_post(self, shortcode):
# Fetch post metadata via instaloader; manual download to keep audio intact
post = instaloader.Post.from_shortcode(self.instaloader.context, shortcode)
if post.typename == "GraphSidecar":
for idx, node in enumerate(post.get_sidecar_nodes()):
if node.is_video:
ext = self._pick_ext(node.video_url, ".mp4")
dest = f"{shortcode}_{idx}{ext}"
self._download_binary(node.video_url, dest)
else:
ext = self._pick_ext(node.display_url, ".jpg")
dest = f"{shortcode}_{idx}{ext}"
self._download_binary(node.display_url, dest)
elif post.is_video:
ext = self._pick_ext(post.video_url, ".mp4")
dest = f"{shortcode}{ext}"
self._download_binary(post.video_url, dest)
else:
ext = self._pick_ext(post.url, ".jpg")
dest = f"{shortcode}{ext}"
self._download_binary(post.url, dest)
if __name__ == "__main__":
handler = InstagramHandler()
# Example usage
# shortcode = "DG_OqyBsqSC"
# handler.download_post(shortcode)
shortcode = "Br-7zlEBjPc"
handler.download_post(shortcode)

View File

@@ -0,0 +1,246 @@
import os
import requests
from parsel import Selector
MAX_FILE_BYTES = int(os.getenv("max_file_bytes", str(100 * 1024 * 1024)))
# Huge thanks to the financiallyruined for his TikTok-Multi-Downloader project
# Majority of this code is from there
# https://github.com/financiallyruined/TikTok-Multi-Downloader
def downloader(file_name, response, extension):
file_name = f"{file_name}"
file_path = os.path.join(".", f"{file_name}.{extension}")
content_length = response.headers.get("Content-Length")
if content_length and int(content_length) > MAX_FILE_BYTES:
raise ValueError("File exceeds size limit")
downloaded = 0
try:
with open(file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024 * 256):
if not chunk:
continue
downloaded += len(chunk)
if downloaded > MAX_FILE_BYTES:
raise ValueError("File exceeds size limit")
file.write(chunk)
except Exception:
if os.path.exists(file_path):
os.remove(file_path)
raise
def download_v1(link, file_name, content_type):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) Gecko/20100101 Firefox/107.4",
"Content-Type": "application/x-www-form-urlencoded",
"Origin": "https://tmate.cc",
"Connection": "keep-alive",
"Referer": "https://tmate.cc/",
"Sec-Fetch-Site": "same-origin",
}
with requests.Session() as s:
try:
response = s.get("https://tmate.cc/", headers=headers)
selector = Selector(response.text)
token = selector.css('input[name="token"]::attr(value)').get()
data = {"url": link, "token": token}
response = s.post(
"https://tmate.cc/action", headers=headers, data=data
).json()["data"]
selector = Selector(text=response)
if content_type == "video":
download_links = selector.css(
".downtmate-right.is-desktop-only.right a::attr(href)"
).getall()
print(f"Found {len(download_links)} download link(s):")
for i, link in enumerate(download_links):
print(f"[{i}] {link}")
for link in download_links:
print(f"Trying download link: {link}")
response = s.get(link, stream=True, headers=headers)
# Skip if it's not a real video
content_type = response.headers.get("Content-Type", "")
if "text/html" in content_type:
print("⚠️ Skipping link: got HTML instead of video.")
continue
# Valid video file, proceed to save
downloader(file_name, response, extension="mp4")
print("✅ Successfully downloaded video.")
break
else:
print("❌ No valid video links found.")
else:
download_links = selector.css(".card-img-top::attr(src)").getall()
for index, download_link in enumerate(download_links):
response = s.get(download_link, stream=True, headers=headers)
downloader(f"{file_name}_{index}", response, extension="jpeg")
except Exception as e:
print(f"\033[91merror\033[0m: {link} - {str(e)}")
with open("errors.txt", "a") as error_file:
error_file.write(link + "\n")
def download_v2(link, file_name, content_type):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
"Sec-Fetch-Site": "same-origin",
"Content-Type": "application/x-www-form-urlencoded",
"Origin": "https://musicaldown.com",
"Connection": "keep-alive",
"Referer": "https://musicaldown.com/en?ref=more",
}
with requests.Session() as s:
try:
r = s.get("https://musicaldown.com/en", headers=headers)
selector = Selector(text=r.text)
token_a = selector.xpath('//*[@id="link_url"]/@name').get()
token_b = selector.xpath(
'//*[@id="submit-form"]/div/div[1]/input[2]/@name'
).get()
token_b_value = selector.xpath(
'//*[@id="submit-form"]/div/div[1]/input[2]/@value'
).get()
data = {
token_a: link,
token_b: token_b_value,
"verify": "1",
}
response = s.post(
"https://musicaldown.com/download", headers=headers, data=data
)
selector = Selector(text=response.text)
if content_type == "video":
watermark = selector.xpath(
"/html/body/div[2]/div/div[2]/div[2]/a[3]/@href"
).get()
download_link = watermark
response = s.get(download_link, stream=True, headers=headers)
downloader(file_name, response, extension="mp4")
else:
download_links = selector.xpath(
'//div[@class="card-image"]/img/@src'
).getall()
for index, download_link in enumerate(download_links):
response = s.get(download_link, stream=True, headers=headers)
downloader(f"{file_name}_{index}", response, extension="jpeg")
except Exception as e:
print(f"\033[91merror\033[0m: {link} - {str(e)}")
with open("errors.txt", "a") as error_file:
error_file.write(link + "\n")
def download_v3(link, file_name, content_type):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"HX-Request": "true",
"HX-Trigger": "search-btn",
"HX-Target": "tiktok-parse-result",
"HX-Current-URL": "https://tiktokio.com/",
"Content-Type": "application/x-www-form-urlencoded",
"Origin": "https://tiktokio.com",
"Connection": "keep-alive",
"Referer": "https://tiktokio.com/",
}
with requests.Session() as s:
try:
r = s.get("https://tiktokio.com/", headers=headers)
selector = Selector(text=r.text)
prefix = selector.css('input[name="prefix"]::attr(value)').get()
data = {
"prefix": prefix,
"vid": link,
}
response = requests.post(
"https://tiktokio.com/api/v1/tk-htmx", headers=headers, data=data
)
selector = Selector(text=response.text)
if content_type == "video":
download_link_index = 2
download_link = selector.css("div.tk-down-link a::attr(href)").getall()[
download_link_index
]
response = s.get(download_link, stream=True, headers=headers)
downloader(file_name, response, extension="mp4")
else:
download_links = selector.xpath(
'//div[@class="media-box"]/img/@src'
).getall()
for index, download_link in enumerate(download_links):
response = s.get(download_link, stream=True, headers=headers)
downloader(f"{file_name}_{index}", response, extension="jpeg")
except Exception as e:
print(f"\033[91merror\033[0m: {link} - {str(e)}")
with open("errors.txt", "a") as error_file:
error_file.write(link + "\n")
def fallback_download(link, file_name, content_type):
for func in [download_v1, download_v2, download_v3]:
try:
func(link, file_name, content_type)
return
except Exception as e:
print(f"Failed with {func.__name__}: {str(e)}")
if __name__ == "__main__":
img_link1 = "https://vm.tiktok.com/ZMSWakLUd/"
img_link2 = "https://vm.tiktok.com/ZMSWabGYL/"
img_link3 = "https://vm.tiktok.com/ZMSWashGU/"
vid_link1 = "https://vm.tiktok.com/ZMSnxPae1/"
vid_link2 = "https://vm.tiktok.com/ZMSnxfmEC/"
vid_link3 = "https://vm.tiktok.com/ZMSnx9kG1/"
print("Downloading using tmate.cc (v1)...")
download_v1(img_link1, "test_image_1", "photo")
download_v1(vid_link1, "test_video_1", "video")
print("Downloading using musicaldown.com (v2)...")
download_v2(img_link2, "test_image_2", "photo")
download_v2(vid_link2, "test_video_2", "video")
print("Downloading using tiktokio.com (v3)...")
download_v3(img_link3, "test_image_3", "photo")
download_v3(vid_link3, "test_video_3", "video")

View File

@@ -0,0 +1,30 @@
import yt_dlp
import os
class YouTubeHandler:
def __init__(self):
self.ydl_opts = {
# Download video suitable for Telegram (max 720p for reasonable size)
"format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best",
"outtmpl": "%(id)s.%(ext)s",
"quiet": False,
"no_warnings": False,
# Add metadata for proper video player support
"writethumbnail": False,
"prefer_ffmpeg": False,
}
def download_video(self, video_id):
"""Download YouTube video by video ID with Telegram-compatible format"""
try:
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
ydl.download([f"https://www.youtube.com/watch?v={video_id}"])
except Exception as e:
raise Exception(f"Failed to download YouTube video {video_id}: {e}")
if __name__ == "__main__":
handler = YouTubeHandler()
# Example usage
# handler.download_video("dQw4w9WgXcQ")

53
file_utils.py Normal file
View File

@@ -0,0 +1,53 @@
import os
import mimetypes
import glob
from telegram import InputMediaPhoto, InputMediaVideo
class FileHandler:
def __init__(self):
self.file_path = os.getcwd()
def _get_file_paths(self, file_name):
pattern = os.path.join(self.file_path, f"*{file_name}*.*")
return sorted(glob.glob(pattern))
def get_files(self, file_name):
files = self._get_file_paths(file_name)
if not files:
return []
media_files = []
for file in files:
mime_type, _ = mimetypes.guess_type(file)
if mime_type and mime_type.startswith("image/"):
media_files.append(InputMediaPhoto(media=open(file, "rb")))
elif mime_type and mime_type.startswith("video/"):
# For videos, enable streaming support for proper playback controls
media_files.append(
InputMediaVideo(
media=open(file, "rb"),
supports_streaming=True
)
)
else:
print(f"Unsupported file type: {file}")
return media_files
def delete_files(self, file_name):
file_paths = self._get_file_paths(file_name)
for path in file_paths:
try:
if os.path.exists(path):
os.remove(path)
print(f"Deleted file: {path}")
else:
print(f"File not found (skipped): {path}")
except Exception as e:
print(f"Error deleting file {path}: {e}")
if __name__ == "__main__":
file_handler = FileHandler()
print(file_handler._get_file_paths("Br-7zlEBjPc"))

209
main.py Normal file
View File

@@ -0,0 +1,209 @@
import asyncio
import time
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
MessageHandler,
filters,
ContextTypes,
)
from telegram.request import HTTPXRequest
from dotenv import load_dotenv
from web_utils import LinkHandler
from file_utils import FileHandler
from download_utils.instagram_utils import InstagramHandler
from download_utils.tiktok_utils import fallback_download
from download_utils.youtube_utils import YouTubeHandler
from stats_utils import load_stats, save_stats, hash_id
import os
# Load environment variables
stats = load_stats() # Just to show off bot's usage B)
load_dotenv()
api_key = os.getenv("telegram_token")
admin_id = int(os.getenv("admin_id"))
MAX_CONCURRENT_JOBS = int(os.getenv("max_concurrent_jobs", "1"))
QUEUE_MAXSIZE = int(os.getenv("queue_maxsize", "30"))
PER_CHAT_COOLDOWN = int(os.getenv("per_chat_cooldown_sec", "10"))
# Parse allowed user IDs
allowed_users_str = os.getenv("allowed_user_ids", "")
ALLOWED_USER_IDS = set(
int(uid.strip()) for uid in allowed_users_str.split(",")
if uid.strip().isdigit()
) if allowed_users_str else set()
# Initialize handlers
instagram_handler = InstagramHandler()
youtube_handler = YouTubeHandler()
link_handler = LinkHandler()
file_handler = FileHandler()
download_queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)
worker_tasks = []
stats_lock = asyncio.Lock()
last_chat_request = {}
async def process_download_job(app, job):
match = job["match"]
chat_id = job["chat_id"]
message_id = job["message_id"]
user_id = job["user_id"]
try:
shortcode = link_handler.extract_shortcode(match)
if not shortcode:
raise ValueError("Shortcode could not be extracted")
if "tiktok.com" in match:
tiktok_type = await asyncio.to_thread(link_handler.extract_tiktok_type, match)
await asyncio.to_thread(
fallback_download, match, shortcode, tiktok_type
)
elif "instagram.com" in match:
await asyncio.to_thread(instagram_handler.download_post, shortcode)
elif "youtube.com" in match or "youtu.be" in match:
await asyncio.to_thread(youtube_handler.download_video, shortcode)
media = file_handler.get_files(shortcode)
if media:
await app.bot.send_media_group(
chat_id=chat_id, media=media, reply_to_message_id=message_id
)
try:
file_handler.delete_files(shortcode)
except Exception as e:
await send_error_message(app.bot, [match], f"Cleanup failed: {e}")
else:
await app.bot.send_message(
chat_id=chat_id,
text="Something went wrong while retrieving media for this link.",
reply_to_message_id=message_id,
)
await send_error_message(app.bot, [match], "No media files found.")
async with stats_lock:
user_hash = hash_id(user_id)
chat_hash = hash_id(chat_id)
if user_hash not in stats["unique_users"]:
stats["unique_users"].add(user_hash)
if chat_hash not in stats["unique_chats"]:
stats["unique_chats"].add(chat_hash)
stats["total_links"] += 1
save_stats(stats)
except Exception as e:
print(f"Error processing {match}: {e}")
await app.bot.send_message(
chat_id=chat_id,
text="Something went wrong while processing the link.",
reply_to_message_id=message_id,
)
await send_error_message(app.bot, [match], str(e))
async def worker(app):
while True:
job = await download_queue.get()
try:
await process_download_job(app, job)
finally:
download_queue.task_done()
async def handle_links(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text:
return
# Check if user is allowed
user_id = update.message.from_user.id
if ALLOWED_USER_IDS and user_id not in ALLOWED_USER_IDS:
await update.message.reply_text(
"شما اجازه استفاده از این بات را ندارید."
)
print(f"Unauthorized user {user_id} tried to use the bot")
return
text = update.message.text
print(f"Received message: {text}")
matches = link_handler.link_pattern.findall(text)
print(f"Found matches: {matches}")
chat_id = update.message.chat_id
now = time.monotonic()
last_seen = last_chat_request.get(chat_id)
if last_seen and (now - last_seen) < PER_CHAT_COOLDOWN:
wait_for = int(PER_CHAT_COOLDOWN - (now - last_seen))
await update.message.reply_text(
f"بات در حال کار است، لطفا {wait_for} ثانیه دیگر دوباره امتحان کنید."
)
return
last_chat_request[chat_id] = now
for match in matches:
if download_queue.full():
await update.message.reply_text(
"The bot is busy right now. Please try again in a few minutes."
)
continue
job = {
"match": match,
"chat_id": chat_id,
"message_id": update.message.message_id,
"user_id": update.message.from_user.id,
}
await download_queue.put(job)
queue_position = download_queue.qsize()
await update.message.reply_text(
f"در صف دانلود هستید. جایگاه شما: {queue_position}."
)
# Function to send error messages to the admin
async def send_error_message(bot, matches, error_msg=""):
error_message = (
"The following links could not be processed:\n"
+ "\n".join(matches)
+ "\n"
+ error_msg
)
await bot.send_message(chat_id=admin_id, text=error_message)
async def on_startup(app):
for _ in range(MAX_CONCURRENT_JOBS):
task = asyncio.create_task(worker(app))
worker_tasks.append(task)
async def on_shutdown(app):
for task in worker_tasks:
task.cancel()
await asyncio.gather(*worker_tasks, return_exceptions=True)
if __name__ == "__main__":
# Bot setup and start polling
request = HTTPXRequest(
connect_timeout=60.0,
read_timeout=60.0,
write_timeout=60.0,
pool_timeout=10.0,
media_write_timeout=60.0,
)
app = (
ApplicationBuilder()
.token(api_key)
.request(request)
.post_init(on_startup)
.post_shutdown(on_shutdown)
.build()
)
app.add_handler(MessageHandler(filters.ALL, handle_links))
app.run_polling()

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
instaloader
requests
parsel
python-telegram-bot
python-dotenv
yt-dlp

57
stats_utils.py Normal file
View File

@@ -0,0 +1,57 @@
import json
import os
import hashlib
from datetime import datetime
STATS_FILE = "bot_stats.json"
def hash_id(id_value):
return hashlib.sha256(str(id_value).encode()).hexdigest()
def load_stats():
if not os.path.exists(STATS_FILE):
return {
"unique_users": set(),
"unique_chats": set(),
"total_links": 0,
"started_at": datetime.utcnow().isoformat() + "Z",
}
with open(STATS_FILE, "r") as f:
data = json.load(f)
# Convert lists to sets
data["unique_users"] = set(data.get("unique_users", []))
data["unique_chats"] = set(data.get("unique_chats", []))
return data
def save_stats(stats):
data_to_save = stats.copy()
data_to_save["unique_users"] = list(stats["unique_users"])
data_to_save["unique_chats"] = list(stats["unique_chats"])
with open(STATS_FILE, "w") as f:
json.dump(data_to_save, f, indent=2)
def print_stats():
stats = load_stats()
started_at_iso = stats.get("started_at", None)
if started_at_iso:
try:
dt = datetime.fromisoformat(started_at_iso.replace("Z", "+00:00"))
started_at = dt.strftime("%Y.%m.%d %H:%M")
except Exception:
started_at = started_at_iso
else:
started_at = "unknown"
print("Current Bot Stats:")
print(f"Bot started at (UTC): {started_at}")
print(f"Unique users: {len(stats['unique_users'])}")
print(f"Unique chats: {len(stats['unique_chats'])}")
print(f"Total links processed: {stats['total_links']}")
if __name__ == "__main__":
print_stats()

1
test.sh Executable file
View File

@@ -0,0 +1 @@
python3 -m unittest discover -s .

53
test_web_utils.py Normal file
View File

@@ -0,0 +1,53 @@
import unittest
from web_utils import LinkHandler
class TestLinkHandler(unittest.TestCase):
def setUp(self):
self.handler = LinkHandler()
def test_extract_shortcode_instagram(self):
test_cases = [
("https://www.instagram.com/p/BsOGulcndj-/", "BsOGulcndj-"),
("https://instagram.com/reel/CuZ7YF8oQJZ/", "CuZ7YF8oQJZ"),
("http://www.instagram.com/p/ABC123/", "ABC123"),
("https://instagram.com/p/AbC-123_xYz/?param=value", "AbC-123_xYz"),
]
for url, expected in test_cases:
with self.subTest(url=url):
self.assertEqual(self.handler.extract_shortcode(url), expected)
def test_extract_shortcode_tiktok(self):
test_cases = [
("https://vm.tiktok.com/ZMebxCR7T/", "ZMebxCR7T"),
(
"http://www.tiktok.com/@user123/video/1234567890123456789",
"1234567890123456789",
),
(
"https://tiktok.com/@user.name/video/9876543210987654321/",
"9876543210987654321",
),
]
for url, expected in test_cases:
with self.subTest(url=url):
self.assertEqual(self.handler.extract_shortcode(url), expected)
def test_extract_shortcode_invalid(self):
invalid_urls = [
"https://www.google.com",
"not a url",
"https://www.instagram.com/username/",
"https://www.reddit.com/r/subreddit/",
"https://tiktok.com/@username/",
]
for url in invalid_urls:
with self.subTest(url=url):
self.assertIsNone(self.handler.extract_shortcode(url))
if __name__ == "__main__":
unittest.main()

63
web_utils.py Normal file
View File

@@ -0,0 +1,63 @@
import re
import requests
class LinkHandler:
def __init__(self):
self.link_pattern = re.compile(
r"""
https?:\/\/
(?:www\.)?
(?:
instagram\.com\/(?:reel|p)\/[\w-]+\/?(?:\?[^\s]*)? |
tiktok\.com\/@[\w.-]+\/(?:video|photo)\/\d+ |
tiktok\.com\/embed(?:\/v2)?\/\d+ |
vm\.tiktok\.com\/[\w\/]+ |
vt\.tiktok\.com\/[\w\/]+ |
youtube\.com\/watch\?v=[\w-]+ |
youtu\.be\/[\w-]+ |
youtube\.com\/shorts\/[\w-]+
)
\/?
(?:\?[^\s]*)?
""",
re.VERBOSE,
)
self.shortcode_pattern = re.compile(
r"""
(?:https?:\/\/(?:www\.)?)
(?:
instagram\.com\/(?:reel|p)\/(?P<instagram_shortcode>[\w-]+) |
vm\.tiktok\.com\/(?P<tiktok_shortcode_short>[^\/\?\s]+) |
tiktok\.com\/@[\w.-]+\/video\/(?P<tiktok_shortcode_long>\d+) |
youtube\.com\/watch\?v=(?P<youtube_video_id>[\w-]+) |
youtu\.be\/(?P<youtube_short_id>[\w-]+) |
youtube\.com\/shorts\/(?P<youtube_shorts_id>[\w-]+)
)
""",
re.VERBOSE,
)
self.tiktok_type_pattern = re.compile(r"tiktok\.com/@[\w.-]+/(video|photo)/\d+")
def extract_shortcode(self, url):
match = self.shortcode_pattern.search(url)
if not match:
return None
return next(
(group for group in match.groupdict().values() if group is not None), None
)
def _resolve_tiktok_link(self, url):
try:
response = requests.get(url, allow_redirects=True, timeout=5)
return response.url
except Exception as e:
raise ValueError(f"Failed to resolve TikTok link: {e}")
def extract_tiktok_type(self, url):
resolved = self._resolve_tiktok_link(url)
match = self.tiktok_type_pattern.search(resolved)
if match:
return match.group(1)
raise ValueError("Type not found in resolved TikTok URL")