Files
traduccion_bots/bottelegram/telegram_bot.py

305 lines
11 KiB
Python

import os
import sys
import json
import threading
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import aiohttp
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, filters
from botdiscord.config import load_config, get_telegram_token, get_libretranslate_url
from botdiscord.database import init_db as init_shared_db
from botdiscord.translate import load_lang_mappings, get_lang_mapping, get_flag_mapping, get_name_to_code
load_config()
from botdiscord.database import init_db
init_db()
load_lang_mappings("telegram")
PENDING_FILE = "/app/data/pending_translations.json"
def load_pending_translations():
try:
if os.path.exists(PENDING_FILE):
with open(PENDING_FILE, "r") as f:
return json.load(f)
except:
pass
return {}
def save_pending_translations(data):
try:
with open(PENDING_FILE, "w") as f:
json.dump(data, f)
except:
pass
user_languages = {}
pending_translations = load_pending_translations()
# Limitar el tamaño del diccionario para evitar fugas de memoria (mantener últimas 1000)
def cleanup_pending_translations():
if len(pending_translations) > 1000:
try:
sorted_keys = sorted(pending_translations.keys(), key=lambda x: int(x))
for k in sorted_keys[:-1000]:
del pending_translations[k]
except:
keys = list(pending_translations.keys())
for k in keys[:200]:
del pending_translations[k]
lock = threading.Lock()
def get_lang_keyboard(bot_type="telegram"):
lang_mapping = get_lang_mapping(bot_type)
flag_mapping = get_flag_mapping(bot_type)
buttons = []
for code, name in lang_mapping.items():
flag = flag_mapping.get(code, "")
label = flag if flag else name
buttons.append(InlineKeyboardButton(label, callback_data=f"trans_{code}"))
# Agrupar en filas de 4 botones (columnas)
keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)]
return InlineKeyboardMarkup(keyboard)
async def translate_text_telegram(text: str, target_lang: str) -> str:
url = get_libretranslate_url()
payload = {
"q": text,
"source": "auto",
"target": target_lang,
"format": "text"
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=payload, timeout=10) as resp:
if resp.status == 200:
data = await resp.json()
return data.get("translatedText", "Error en la traducción.")
else:
return f"Error de API: {resp.status}"
except Exception as e:
return f"Error de conexión: {str(e)}"
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"¡Hola! Soy un bot de traducción.\n\n"
"Envíame cualquier texto y lo traduciré al idioma que elijas.\n"
"Usa /idiomas para seleccionar el idioma de traducción."
)
async def languages_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
lang_mapping = get_lang_mapping("telegram")
flag_mapping = get_flag_mapping("telegram")
buttons = []
for code, name in lang_mapping.items():
flag = flag_mapping.get(code, "")
label = flag if flag else name
buttons.append(InlineKeyboardButton(label, callback_data=f"lang_{code}"))
keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Selecciona el idioma de traducción:",
reply_markup=reply_markup
)
async def language_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
lang_code = query.data.replace("lang_", "")
lang_mapping = get_lang_mapping("telegram")
flag_mapping = get_flag_mapping("telegram")
lang_name = "Desconocido"
for name, code in lang_mapping.items():
if code == lang_code:
lang_name = name
break
lang_flag = flag_mapping.get(lang_code, "")
user_languages[query.from_user.id] = lang_code
display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name
await query.edit_message_text(f"Idioma predeterminado configurado: {display_name}")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.strip()
if len(text) < 2: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_text(
f"🌍 *Original:*\n{text}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
with lock:
cleanup_pending_translations()
pending_translations[str(sent_message.message_id)] = text
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_photo(
update.message.photo[-1].file_id,
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
with lock:
cleanup_pending_translations()
pending_translations[str(sent_message.message_id)] = caption
async def translation_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
lang_code = query.data.replace("trans_", "")
message_id = str(query.message.message_id)
text = pending_translations.get(message_id)
if not text:
await query.message.reply_text("⚠️ No se encontró el texto original. Es posible que el bot se haya reiniciado recientemente.")
return
translated = await translate_text_telegram(text, lang_code)
lang_mapping = get_lang_mapping("telegram")
flag_mapping = get_flag_mapping("telegram")
lang_name = "Desconocido"
for name, code in lang_mapping.items():
if code == lang_code:
lang_name = name
break
lang_flag = flag_mapping.get(lang_code, "")
display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name
reply_markup = get_lang_keyboard()
new_text = f"{translated}\n\n📝 *Original:*\n{text}"
try:
if query.message.caption:
await query.edit_message_caption(
caption=new_text,
parse_mode="Markdown",
reply_markup=reply_markup
)
else:
await query.edit_message_text(
text=new_text,
parse_mode="Markdown",
reply_markup=reply_markup
)
except Exception as e:
pass
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_document(
update.message.document.file_id,
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_video(
update.message.video.file_id,
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_animation(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_animation(
update.message.animation.file_id,
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_audio(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_audio(
update.message.audio.file_id,
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_voice(
update.message.voice.file_id,
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_sticker(update: Update, context: ContextTypes.DEFAULT_TYPE):
return
async def handle_video_note(update: Update, context: ContextTypes.DEFAULT_TYPE):
return
def run_telegram_bot():
from botdiscord.database import init_db
init_db()
token = get_telegram_token()
if not token or token == "TU_TELEGRAM_BOT_TOKEN":
print("ERROR: Configura el token de Telegram en config.yaml")
return
application = Application.builder().token(token).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("idiomas", languages_command))
application.add_handler(CallbackQueryHandler(translation_callback, pattern="^trans_"))
application.add_handler(MessageHandler(filters.PHOTO, handle_photo))
application.add_handler(MessageHandler(filters.Document.ALL, handle_document))
application.add_handler(MessageHandler(filters.VIDEO, handle_video))
application.add_handler(MessageHandler(filters.ANIMATION, handle_animation))
application.add_handler(MessageHandler(filters.AUDIO, handle_audio))
application.add_handler(MessageHandler(filters.VOICE, handle_voice))
application.add_handler(MessageHandler(filters.Sticker.ALL, handle_sticker))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
print("Bot Telegram iniciado. Presiona Ctrl+C para detener.")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
run_telegram_bot()