Files
traduccion_bots/bottelegram/telegram_bot.py
nickpons666 8398e988b0 Feat: Agregar agente Groq con integración RAG
- Nuevo módulo groq_agent.py para consultas a la API de Groq
- Panel de administración en /groq para configurar API key, modelo y prompt
- Comando /rag en Discord y Telegram para consultar el RAG
- Sistema de prompt personalizable guardado en base de datos
- Soporte para variables de entorno en Docker
- Fix: starlette version para evitar bug con Jinja2
2026-03-26 21:23:19 -06:00

369 lines
13 KiB
Python

import os
import sys
import json
import threading
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import aiohttp
import socket
# Forzar IPv4 a nivel global para evitar problemas de conectividad en Docker/ZimaOS
orig_getaddrinfo = socket.getaddrinfo
def patched_getaddrinfo(*args, **kwargs):
responses = orig_getaddrinfo(*args, **kwargs)
return [res for res in responses if res[0] == socket.AF_INET]
socket.getaddrinfo = patched_getaddrinfo
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, filters
from botdiscord.config import load_config, get_telegram_token, get_libretranslate_url
from botdiscord.database import (
init_db as init_shared_db,
save_message, get_message, save_translation, get_cached_translation
)
from botdiscord.translate import load_lang_mappings, get_lang_mapping, get_flag_mapping, get_name_to_code
load_config()
from botdiscord.database import init_db
init_db()
load_lang_mappings("telegram")
PENDING_FILE = "/app/data/pending_translations.json"
def load_pending_translations():
try:
if os.path.exists(PENDING_FILE):
with open(PENDING_FILE, "r") as f:
return json.load(f)
except:
pass
return {}
def save_pending_translations(data):
try:
with open(PENDING_FILE, "w") as f:
json.dump(data, f)
except:
pass
user_languages = {}
pending_translations = load_pending_translations()
# Limitar el tamaño del diccionario para evitar fugas de memoria (mantener últimas 1000)
def cleanup_pending_translations():
if len(pending_translations) > 1000:
try:
sorted_keys = sorted(pending_translations.keys(), key=lambda x: int(x))
for k in sorted_keys[:-1000]:
del pending_translations[k]
except:
keys = list(pending_translations.keys())
for k in keys[:200]:
del pending_translations[k]
lock = threading.Lock()
def get_lang_keyboard(bot_type="telegram"):
lang_mapping = get_lang_mapping(bot_type)
flag_mapping = get_flag_mapping(bot_type)
buttons = []
for code, name in lang_mapping.items():
flag = flag_mapping.get(code, "")
label = flag if flag else name
buttons.append(InlineKeyboardButton(label, callback_data=f"trans_{code}"))
# Agrupar en filas de 4 botones (columnas)
keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)]
return InlineKeyboardMarkup(keyboard)
async def translate_text_telegram(text: str, target_lang: str) -> str:
# Usamos la función compartida de translate.py para tener segmentación y soporte multilingüe
from botdiscord.translate import translate_text
translated = await translate_text(text, target_lang)
# Desescapamos el HTML para Telegram (evitar " etc)
import html
return html.unescape(translated)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"¡Hola! Soy un bot de traducción.\n\n"
"Envíame cualquier texto y lo traduciré al idioma que elijas.\n"
"Usa /idiomas para seleccionar el idioma de traducción."
)
async def languages_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
lang_mapping = get_lang_mapping("telegram")
flag_mapping = get_flag_mapping("telegram")
buttons = []
for code, name in lang_mapping.items():
flag = flag_mapping.get(code, "")
label = flag if flag else name
buttons.append(InlineKeyboardButton(label, callback_data=f"lang_{code}"))
keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Selecciona el idioma de traducción:",
reply_markup=reply_markup
)
async def language_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
lang_code = query.data.replace("lang_", "")
lang_mapping = get_lang_mapping("telegram")
flag_mapping = get_flag_mapping("telegram")
lang_name = "Desconocido"
for name, code in lang_mapping.items():
if code == lang_code:
lang_name = name
break
lang_flag = flag_mapping.get(lang_code, "")
user_languages[query.from_user.id] = lang_code
display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name
await query.edit_message_text(f"Idioma predeterminado configurado: {display_name}")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.strip()
if len(text) < 2: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_text(
f"🌍 *Original:*\n{text}",
parse_mode="Markdown",
reply_markup=reply_markup
)
# Guardamos en MySQL en lugar del JSON local
save_message(sent_message.message_id, update.effective_chat.id, update.effective_user.id, text, {}, 'telegram')
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_photo(
update.message.photo[-1].file_id,
caption=f"🌍 *Original:*\n{caption}",
parse_mode="Markdown",
reply_markup=reply_markup
)
# Guardamos en MySQL
save_message(sent_message.message_id, update.effective_chat.id, update.effective_user.id, caption, {}, 'telegram')
async def translation_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
lang_code = query.data.replace("trans_", "")
message_id = query.message.message_id
# Intentamos obtener del caché de traducciones primero
cached = get_cached_translation(message_id, lang_code)
# Recuperamos el mensaje original de MySQL
db_msg = get_message(message_id)
if not db_msg:
await query.message.reply_text("⚠️ No se encontró el texto original en la base de datos.")
return
text = db_msg['content']
if cached:
translated = cached
else:
# Traducimos usando la lógica compartida
from botdiscord.translate import translate_text
translated = await translate_text(text, lang_code)
# Guardamos en caché MySQL
save_translation(message_id, lang_code, translated)
# Desescapamos el HTML para Telegram
import html
translated = html.unescape(translated)
lang_mapping = get_lang_mapping("telegram")
flag_mapping = get_flag_mapping("telegram")
lang_name = "Desconocido"
for name, code in lang_mapping.items():
if code == lang_code:
lang_name = name
break
lang_flag = flag_mapping.get(lang_code, "")
display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name
reply_markup = get_lang_keyboard()
new_text = f"{translated}\n\n📝 *Original:*\n{text}"
try:
if query.message.caption:
await query.edit_message_caption(
caption=new_text,
parse_mode="Markdown",
reply_markup=reply_markup
)
else:
await query.edit_message_text(
text=new_text,
parse_mode="Markdown",
reply_markup=reply_markup
)
except Exception as e:
pass
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_document(
update.message.document.file_id,
caption=f"🌍 *Original:*\n{caption}",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_video(
update.message.video.file_id,
caption=f"🌍 *Original:*\n{caption}",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_animation(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_animation(
update.message.animation.file_id,
caption=f"🌍 *Original:*\n{caption}",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_audio(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_audio(
update.message.audio.file_id,
caption=f"🌍 *Original:*\n{caption}",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
caption = update.message.caption
if not caption: return
reply_markup = get_lang_keyboard()
sent_message = await update.message.reply_voice(
update.message.voice.file_id,
caption=f"🌍 *Original:*\n{caption}",
parse_mode="Markdown",
reply_markup=reply_markup
)
pending_translations[str(sent_message.message_id)] = caption
async def handle_sticker(update: Update, context: ContextTypes.DEFAULT_TYPE):
return
async def handle_video_note(update: Update, context: ContextTypes.DEFAULT_TYPE):
return
async def rag_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not context.args:
await update.message.reply_text(
"Uso: /rag <pregunta>\n\n"
"Ejemplo: /rag How to get heroes?"
)
return
pregunta = " ".join(context.args)
await update.message.reply_text("🔍 Buscando en la base de conocimientos...")
from botdiscord.groq_agent import chat_with_rag
result = await chat_with_rag(pregunta)
response = result.get("response", "Sin respuesta")
sources = result.get("sources", [])
text = f"🔍 *Last War Knowledge*\n\n{response}"
if sources:
text += "\n\n*Fuentes:*\n"
for s in sources[:3]:
title = s.get("title", "N/A")
text += f"{title}\n"
await update.message.reply_text(text, parse_mode="Markdown")
def run_telegram_bot():
try:
from botdiscord.database import init_db
init_db()
except Exception as e:
print(f"⚠️ Advertencia: Error al inicializar DB (reintentando luego): {e}")
token = get_telegram_token()
if not token or token == "TU_TELEGRAM_BOT_TOKEN":
print("ERROR: Configura el token de Telegram en config.yaml o .env")
return
# Configuración de red de máxima compatibilidad (Forzar IPv4 y HTTP/1.1)
from telegram.request import HTTPXRequest
import httpx
import socket
# Creamos un pool que ignore IPv6 si es posible
limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
request = HTTPXRequest(
connect_timeout=30.0,
read_timeout=30.0,
write_timeout=30.0,
pool_timeout=30.0,
http_version="1.1"
)
application = Application.builder().token(token).request(request).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("idiomas", languages_command))
application.add_handler(CommandHandler("rag", rag_command))
application.add_handler(CallbackQueryHandler(translation_callback, pattern="^trans_"))
application.add_handler(MessageHandler(filters.PHOTO, handle_photo))
application.add_handler(MessageHandler(filters.Document.ALL, handle_document))
application.add_handler(MessageHandler(filters.VIDEO, handle_video))
application.add_handler(MessageHandler(filters.ANIMATION, handle_animation))
application.add_handler(MessageHandler(filters.AUDIO, handle_audio))
application.add_handler(MessageHandler(filters.VOICE, handle_voice))
application.add_handler(MessageHandler(filters.Sticker.ALL, handle_sticker))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
print("Bot Telegram iniciado. Presiona Ctrl+C para detener.")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
run_telegram_bot()