Files
to-hen/streaming-link-bot/bot.py

52 lines
1.9 KiB
Python
Raw Normal View History

2026-03-14 07:30:46 +01:00
#!/usr/bin/env python
from telegram import Update, ForceReply
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
import os
import re
import requests
import sys
def convert_link(link: str) -> str:
if "spotify" in link:
to_service = "youtube_music"
elif "youtube" in link:
to_service = "spotify"
link = re.sub(r"//(www\.)?youtube.com", "//music.youtube.com", link)
else:
raise ValueError("Invalid streaming service: " + link)
print(f"Trying to convert {link}", file=sys.stderr)
r = requests.get(f'https://ytm2spotify.com/convert?url={link}&to_service={to_service}')
print(r.text, file=sys.stderr)
json = r.json()
print(json, file=sys.stderr)
return json["results"][0]["url"]
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}! You can send me links to Spotify songs and I'll give you the link on YouTube Music—and vice versa.",
reply_markup=ForceReply(selective=True),
)
async def streaming_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
converted = convert_link(update.message.text)
await update.message.reply_text(converted)
except Exception as e:
print(e, file=sys.stderr)
await update.message.reply_text("Cannot convert this.")
def main(token: str) -> None:
application = Application.builder().token(token).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, streaming_link))
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
token = os.getenv("TELEGRAM_TOKEN")
if token:
main(token)
else:
print("Missing TELEGRAM_TOKEN environment variable", file=sys.stderr)