import os import sys import json import threading sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio import aiohttp from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, filters from botdiscord.config import load_config, get_telegram_token, get_libretranslate_url from botdiscord.database import init_db as init_shared_db from botdiscord.translate import load_lang_mappings, get_lang_mapping, get_flag_mapping, get_name_to_code load_config() from botdiscord.database import init_db init_db() load_lang_mappings("telegram") PENDING_FILE = "/app/data/pending_translations.json" def load_pending_translations(): try: if os.path.exists(PENDING_FILE): with open(PENDING_FILE, "r") as f: return json.load(f) except: pass return {} def save_pending_translations(data): try: with open(PENDING_FILE, "w") as f: json.dump(data, f) except: pass user_languages = {} pending_translations = load_pending_translations() # Limitar el tamaño del diccionario para evitar fugas de memoria (mantener últimas 1000) def cleanup_pending_translations(): if len(pending_translations) > 1000: try: sorted_keys = sorted(pending_translations.keys(), key=lambda x: int(x)) for k in sorted_keys[:-1000]: del pending_translations[k] except: keys = list(pending_translations.keys()) for k in keys[:200]: del pending_translations[k] lock = threading.Lock() def get_lang_keyboard(bot_type="telegram"): lang_mapping = get_lang_mapping(bot_type) flag_mapping = get_flag_mapping(bot_type) buttons = [] for code, name in lang_mapping.items(): flag = flag_mapping.get(code, "") label = flag if flag else name buttons.append(InlineKeyboardButton(label, callback_data=f"trans_{code}")) # Agrupar en filas de 4 botones (columnas) keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)] return InlineKeyboardMarkup(keyboard) async def translate_text_telegram(text: str, target_lang: str) -> str: # Usamos la función compartida de translate.py para tener segmentación y soporte multilingüe from botdiscord.translate import translate_text translated = await translate_text(text, target_lang) # Desescapamos el HTML para Telegram (evitar " etc) import html return html.unescape(translated) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "¡Hola! Soy un bot de traducción.\n\n" "Envíame cualquier texto y lo traduciré al idioma que elijas.\n" "Usa /idiomas para seleccionar el idioma de traducción." ) async def languages_command(update: Update, context: ContextTypes.DEFAULT_TYPE): lang_mapping = get_lang_mapping("telegram") flag_mapping = get_flag_mapping("telegram") buttons = [] for code, name in lang_mapping.items(): flag = flag_mapping.get(code, "") label = flag if flag else name buttons.append(InlineKeyboardButton(label, callback_data=f"lang_{code}")) keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Selecciona el idioma de traducción:", reply_markup=reply_markup ) async def language_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query await query.answer() lang_code = query.data.replace("lang_", "") lang_mapping = get_lang_mapping("telegram") flag_mapping = get_flag_mapping("telegram") lang_name = "Desconocido" for name, code in lang_mapping.items(): if code == lang_code: lang_name = name break lang_flag = flag_mapping.get(lang_code, "") user_languages[query.from_user.id] = lang_code display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name await query.edit_message_text(f"Idioma predeterminado configurado: {display_name}") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): text = update.message.text.strip() if len(text) < 2: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_text( f"🌍 *Original:*\n{text}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) with lock: cleanup_pending_translations() pending_translations[str(sent_message.message_id)] = text async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): caption = update.message.caption if not caption: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_photo( update.message.photo[-1].file_id, caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) with lock: cleanup_pending_translations() pending_translations[str(sent_message.message_id)] = caption async def translation_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query await query.answer() lang_code = query.data.replace("trans_", "") message_id = str(query.message.message_id) text = pending_translations.get(message_id) if not text: await query.message.reply_text("⚠️ No se encontró el texto original. Es posible que el bot se haya reiniciado recientemente.") return translated = await translate_text_telegram(text, lang_code) lang_mapping = get_lang_mapping("telegram") flag_mapping = get_flag_mapping("telegram") lang_name = "Desconocido" for name, code in lang_mapping.items(): if code == lang_code: lang_name = name break lang_flag = flag_mapping.get(lang_code, "") display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name reply_markup = get_lang_keyboard() new_text = f"{translated}\n\n📝 *Original:*\n{text}" try: if query.message.caption: await query.edit_message_caption( caption=new_text, parse_mode="Markdown", reply_markup=reply_markup ) else: await query.edit_message_text( text=new_text, parse_mode="Markdown", reply_markup=reply_markup ) except Exception as e: pass async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): caption = update.message.caption if not caption: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_document( update.message.document.file_id, caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) pending_translations[str(sent_message.message_id)] = caption async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE): caption = update.message.caption if not caption: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_video( update.message.video.file_id, caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) pending_translations[str(sent_message.message_id)] = caption async def handle_animation(update: Update, context: ContextTypes.DEFAULT_TYPE): caption = update.message.caption if not caption: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_animation( update.message.animation.file_id, caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) pending_translations[str(sent_message.message_id)] = caption async def handle_audio(update: Update, context: ContextTypes.DEFAULT_TYPE): caption = update.message.caption if not caption: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_audio( update.message.audio.file_id, caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) pending_translations[str(sent_message.message_id)] = caption async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE): caption = update.message.caption if not caption: return reply_markup = get_lang_keyboard() sent_message = await update.message.reply_voice( update.message.voice.file_id, caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?", parse_mode="Markdown", reply_markup=reply_markup ) pending_translations[str(sent_message.message_id)] = caption async def handle_sticker(update: Update, context: ContextTypes.DEFAULT_TYPE): return async def handle_video_note(update: Update, context: ContextTypes.DEFAULT_TYPE): return def run_telegram_bot(): from botdiscord.database import init_db init_db() token = get_telegram_token() if not token or token == "TU_TELEGRAM_BOT_TOKEN": print("ERROR: Configura el token de Telegram en config.yaml") return application = Application.builder().token(token).build() application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("idiomas", languages_command)) application.add_handler(CallbackQueryHandler(translation_callback, pattern="^trans_")) application.add_handler(MessageHandler(filters.PHOTO, handle_photo)) application.add_handler(MessageHandler(filters.Document.ALL, handle_document)) application.add_handler(MessageHandler(filters.VIDEO, handle_video)) application.add_handler(MessageHandler(filters.ANIMATION, handle_animation)) application.add_handler(MessageHandler(filters.AUDIO, handle_audio)) application.add_handler(MessageHandler(filters.VOICE, handle_voice)) application.add_handler(MessageHandler(filters.Sticker.ALL, handle_sticker)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) print("Bot Telegram iniciado. Presiona Ctrl+C para detener.") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": run_telegram_bot()