Añadiendo todos los archivos del proyecto (incluidos secretos y venv)
This commit is contained in:
0
bottelegram/__init__.py
Normal file
0
bottelegram/__init__.py
Normal file
BIN
bottelegram/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
bottelegram/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
bottelegram/__pycache__/telegram_bot.cpython-312.pyc
Normal file
BIN
bottelegram/__pycache__/telegram_bot.cpython-312.pyc
Normal file
Binary file not shown.
304
bottelegram/telegram_bot.py
Normal file
304
bottelegram/telegram_bot.py
Normal file
@@ -0,0 +1,304 @@
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import threading
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, filters
|
||||
|
||||
from botdiscord.config import load_config, get_telegram_token, get_libretranslate_url
|
||||
from botdiscord.database import init_db as init_shared_db
|
||||
from botdiscord.translate import load_lang_mappings, get_lang_mapping, get_flag_mapping, get_name_to_code
|
||||
|
||||
load_config()
|
||||
from botdiscord.database import init_db
|
||||
init_db()
|
||||
load_lang_mappings("telegram")
|
||||
|
||||
PENDING_FILE = "/app/data/pending_translations.json"
|
||||
|
||||
def load_pending_translations():
|
||||
try:
|
||||
if os.path.exists(PENDING_FILE):
|
||||
with open(PENDING_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
pass
|
||||
return {}
|
||||
|
||||
def save_pending_translations(data):
|
||||
try:
|
||||
with open(PENDING_FILE, "w") as f:
|
||||
json.dump(data, f)
|
||||
except:
|
||||
pass
|
||||
|
||||
user_languages = {}
|
||||
pending_translations = load_pending_translations()
|
||||
|
||||
# Limitar el tamaño del diccionario para evitar fugas de memoria (mantener últimas 1000)
|
||||
def cleanup_pending_translations():
|
||||
if len(pending_translations) > 1000:
|
||||
try:
|
||||
sorted_keys = sorted(pending_translations.keys(), key=lambda x: int(x))
|
||||
for k in sorted_keys[:-1000]:
|
||||
del pending_translations[k]
|
||||
except:
|
||||
keys = list(pending_translations.keys())
|
||||
for k in keys[:200]:
|
||||
del pending_translations[k]
|
||||
|
||||
lock = threading.Lock()
|
||||
|
||||
def get_lang_keyboard(bot_type="telegram"):
|
||||
lang_mapping = get_lang_mapping(bot_type)
|
||||
flag_mapping = get_flag_mapping(bot_type)
|
||||
|
||||
buttons = []
|
||||
for code, name in lang_mapping.items():
|
||||
flag = flag_mapping.get(code, "")
|
||||
label = flag if flag else name
|
||||
buttons.append(InlineKeyboardButton(label, callback_data=f"trans_{code}"))
|
||||
|
||||
# Agrupar en filas de 4 botones (columnas)
|
||||
keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)]
|
||||
return InlineKeyboardMarkup(keyboard)
|
||||
|
||||
async def translate_text_telegram(text: str, target_lang: str) -> str:
|
||||
url = get_libretranslate_url()
|
||||
payload = {
|
||||
"q": text,
|
||||
"source": "auto",
|
||||
"target": target_lang,
|
||||
"format": "text"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
try:
|
||||
async with session.post(url, json=payload, timeout=10) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
return data.get("translatedText", "Error en la traducción.")
|
||||
else:
|
||||
return f"Error de API: {resp.status}"
|
||||
except Exception as e:
|
||||
return f"Error de conexión: {str(e)}"
|
||||
|
||||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
await update.message.reply_text(
|
||||
"¡Hola! Soy un bot de traducción.\n\n"
|
||||
"Envíame cualquier texto y lo traduciré al idioma que elijas.\n"
|
||||
"Usa /idiomas para seleccionar el idioma de traducción."
|
||||
)
|
||||
|
||||
async def languages_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
lang_mapping = get_lang_mapping("telegram")
|
||||
flag_mapping = get_flag_mapping("telegram")
|
||||
|
||||
buttons = []
|
||||
for code, name in lang_mapping.items():
|
||||
flag = flag_mapping.get(code, "")
|
||||
label = flag if flag else name
|
||||
buttons.append(InlineKeyboardButton(label, callback_data=f"lang_{code}"))
|
||||
|
||||
keyboard = [buttons[i:i + 4] for i in range(0, len(buttons), 4)]
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
|
||||
await update.message.reply_text(
|
||||
"Selecciona el idioma de traducción:",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
|
||||
async def language_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
query = update.callback_query
|
||||
await query.answer()
|
||||
|
||||
lang_code = query.data.replace("lang_", "")
|
||||
lang_mapping = get_lang_mapping("telegram")
|
||||
flag_mapping = get_flag_mapping("telegram")
|
||||
|
||||
lang_name = "Desconocido"
|
||||
for name, code in lang_mapping.items():
|
||||
if code == lang_code:
|
||||
lang_name = name
|
||||
break
|
||||
|
||||
lang_flag = flag_mapping.get(lang_code, "")
|
||||
user_languages[query.from_user.id] = lang_code
|
||||
|
||||
display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name
|
||||
await query.edit_message_text(f"Idioma predeterminado configurado: {display_name}")
|
||||
|
||||
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
text = update.message.text.strip()
|
||||
if len(text) < 2: return
|
||||
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_text(
|
||||
f"🌍 *Original:*\n{text}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
|
||||
with lock:
|
||||
cleanup_pending_translations()
|
||||
pending_translations[str(sent_message.message_id)] = text
|
||||
|
||||
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
caption = update.message.caption
|
||||
if not caption: return
|
||||
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_photo(
|
||||
update.message.photo[-1].file_id,
|
||||
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
with lock:
|
||||
cleanup_pending_translations()
|
||||
pending_translations[str(sent_message.message_id)] = caption
|
||||
|
||||
async def translation_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
query = update.callback_query
|
||||
await query.answer()
|
||||
|
||||
lang_code = query.data.replace("trans_", "")
|
||||
message_id = str(query.message.message_id)
|
||||
|
||||
text = pending_translations.get(message_id)
|
||||
if not text:
|
||||
await query.message.reply_text("⚠️ No se encontró el texto original. Es posible que el bot se haya reiniciado recientemente.")
|
||||
return
|
||||
|
||||
translated = await translate_text_telegram(text, lang_code)
|
||||
|
||||
lang_mapping = get_lang_mapping("telegram")
|
||||
flag_mapping = get_flag_mapping("telegram")
|
||||
|
||||
lang_name = "Desconocido"
|
||||
for name, code in lang_mapping.items():
|
||||
if code == lang_code:
|
||||
lang_name = name
|
||||
break
|
||||
|
||||
lang_flag = flag_mapping.get(lang_code, "")
|
||||
display_name = f"{lang_flag} {lang_name}" if lang_flag else lang_name
|
||||
|
||||
reply_markup = get_lang_keyboard()
|
||||
new_text = f"{translated}\n\n📝 *Original:*\n{text}"
|
||||
|
||||
try:
|
||||
if query.message.caption:
|
||||
await query.edit_message_caption(
|
||||
caption=new_text,
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
else:
|
||||
await query.edit_message_text(
|
||||
text=new_text,
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
caption = update.message.caption
|
||||
if not caption: return
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_document(
|
||||
update.message.document.file_id,
|
||||
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
pending_translations[str(sent_message.message_id)] = caption
|
||||
|
||||
async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
caption = update.message.caption
|
||||
if not caption: return
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_video(
|
||||
update.message.video.file_id,
|
||||
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
pending_translations[str(sent_message.message_id)] = caption
|
||||
|
||||
async def handle_animation(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
caption = update.message.caption
|
||||
if not caption: return
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_animation(
|
||||
update.message.animation.file_id,
|
||||
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
pending_translations[str(sent_message.message_id)] = caption
|
||||
|
||||
async def handle_audio(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
caption = update.message.caption
|
||||
if not caption: return
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_audio(
|
||||
update.message.audio.file_id,
|
||||
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
pending_translations[str(sent_message.message_id)] = caption
|
||||
|
||||
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
caption = update.message.caption
|
||||
if not caption: return
|
||||
reply_markup = get_lang_keyboard()
|
||||
sent_message = await update.message.reply_voice(
|
||||
update.message.voice.file_id,
|
||||
caption=f"🌍 *Original:*\n{caption}\n\n¿A qué idioma quieres traducir?",
|
||||
parse_mode="Markdown",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
pending_translations[str(sent_message.message_id)] = caption
|
||||
|
||||
|
||||
async def handle_sticker(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
return
|
||||
|
||||
async def handle_video_note(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
return
|
||||
|
||||
def run_telegram_bot():
|
||||
from botdiscord.database import init_db
|
||||
init_db()
|
||||
|
||||
token = get_telegram_token()
|
||||
if not token or token == "TU_TELEGRAM_BOT_TOKEN":
|
||||
print("ERROR: Configura el token de Telegram en config.yaml")
|
||||
return
|
||||
|
||||
application = Application.builder().token(token).build()
|
||||
|
||||
application.add_handler(CommandHandler("start", start))
|
||||
application.add_handler(CommandHandler("idiomas", languages_command))
|
||||
application.add_handler(CallbackQueryHandler(translation_callback, pattern="^trans_"))
|
||||
application.add_handler(MessageHandler(filters.PHOTO, handle_photo))
|
||||
application.add_handler(MessageHandler(filters.Document.ALL, handle_document))
|
||||
application.add_handler(MessageHandler(filters.VIDEO, handle_video))
|
||||
application.add_handler(MessageHandler(filters.ANIMATION, handle_animation))
|
||||
application.add_handler(MessageHandler(filters.AUDIO, handle_audio))
|
||||
application.add_handler(MessageHandler(filters.VOICE, handle_voice))
|
||||
application.add_handler(MessageHandler(filters.Sticker.ALL, handle_sticker))
|
||||
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
||||
|
||||
print("Bot Telegram iniciado. Presiona Ctrl+C para detener.")
|
||||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_telegram_bot()
|
||||
Reference in New Issue
Block a user