#!/usr/bin/env python from telegram import Update, ForceReply from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters import os import re import requests import sys def convert_link(link: str) -> str: if "spotify" in link: to_service = "youtube_music" elif "youtube" in link: to_service = "spotify" link = re.sub(r"//(www\.)?youtube.com", "//music.youtube.com", link) else: raise ValueError("Invalid streaming service: " + link) print(f"Trying to convert {link}", file=sys.stderr) r = requests.get(f'https://ytm2spotify.com/convert?url={link}&to_service={to_service}') print(r.text, file=sys.stderr) json = r.json() print(json, file=sys.stderr) return json["results"][0]["url"] async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user await update.message.reply_html( rf"Hi {user.mention_html()}! You can send me links to Spotify songs and I'll give you the link on YouTube Music—and vice versa.", reply_markup=ForceReply(selective=True), ) async def streaming_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: try: converted = convert_link(update.message.text) await update.message.reply_text(converted) except Exception as e: print(e, file=sys.stderr) await update.message.reply_text("Cannot convert this.") def main(token: str) -> None: application = Application.builder().token(token).build() application.add_handler(CommandHandler("start", start)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, streaming_link)) application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": token = os.getenv("TELEGRAM_TOKEN") if token: main(token) else: print("Missing TELEGRAM_TOKEN environment variable", file=sys.stderr)