- Nuevo módulo groq_agent.py para consultas a la API de Groq - Panel de administración en /groq para configurar API key, modelo y prompt - Comando /rag en Discord y Telegram para consultar el RAG - Sistema de prompt personalizable guardado en base de datos - Soporte para variables de entorno en Docker - Fix: starlette version para evitar bug con Jinja2
222 lines
8.0 KiB
Python
222 lines
8.0 KiB
Python
import os
|
|
import sys
|
|
import signal
|
|
import time
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import discord
|
|
from discord.ext import commands, tasks
|
|
from discord import app_commands
|
|
import re
|
|
from botdiscord.reload_marker import check_reload_marker
|
|
import html
|
|
|
|
from botdiscord.config import get_discord_token, load_config, get_languages
|
|
from botdiscord.database import init_db, get_active_languages, get_bot_languages, save_message, get_welcome_message, is_channel_enabled
|
|
from botdiscord.ui import PersistentTranslationView, ConfigView, WelcomeTranslationView, TranslationButton
|
|
from botdiscord.translate import get_reverse_mapping, load_lang_mappings, get_name_to_code, get_flag_mapping
|
|
|
|
load_config()
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.members = True
|
|
bot = commands.Bot(command_prefix="!", intents=intents)
|
|
|
|
@tasks.loop(seconds=3)
|
|
async def restart_monitor():
|
|
if check_reload_marker():
|
|
print("[Bot] Cambios desde panel detectados. Recargando configuraciones sin reiniciar...")
|
|
from botdiscord.database import clear_channel_cache
|
|
from botdiscord.translate import load_lang_mappings
|
|
from botdiscord.config import load_config
|
|
clear_channel_cache()
|
|
load_config()
|
|
load_lang_mappings("discord")
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
init_db()
|
|
load_lang_mappings("discord")
|
|
|
|
bot.add_view(PersistentTranslationView())
|
|
bot.add_view(WelcomeTranslationView())
|
|
|
|
if not restart_monitor.is_running():
|
|
restart_monitor.start()
|
|
|
|
print(f"Bot Discord conectado como {bot.user}")
|
|
# try:
|
|
# synced = await bot.tree.sync()
|
|
# print(f"Sincronizados {len(synced)} comandos.")
|
|
# except Exception as e:
|
|
# print(f"Error sync: {e}")
|
|
|
|
@bot.event
|
|
async def on_member_join(member):
|
|
print(f"[WELCOME] Nuevo miembro: {member.name} ({member.id}) en guild: {member.guild.id}")
|
|
|
|
if member.bot:
|
|
print(f"[WELCOME] Ignorado: es bot")
|
|
return
|
|
|
|
try:
|
|
welcome_config = get_welcome_message(member.guild.id)
|
|
print(f"[WELCOME] Config obtained: {welcome_config}")
|
|
|
|
if not welcome_config:
|
|
print(f"[WELCOME] No hay configuración para guild {member.guild.id}")
|
|
return
|
|
|
|
enabled = welcome_config.get('enabled', False) if hasattr(welcome_config, 'get') else welcome_config[2]
|
|
if not enabled:
|
|
print(f"[WELCOME] Bienvenida deshabilitada para guild {member.guild.id}")
|
|
return
|
|
|
|
channel_id = welcome_config.get('channel_id') if hasattr(welcome_config, 'get') else welcome_config[0]
|
|
message_template = welcome_config.get('message_content') if hasattr(welcome_config, 'get') else welcome_config[1]
|
|
|
|
print(f"[WELCOME] Channel ID: {channel_id}, Template: {message_template}")
|
|
|
|
if not channel_id or not message_template:
|
|
return
|
|
|
|
channel = member.guild.get_channel(channel_id)
|
|
if not channel:
|
|
print(f"[WELCOME] Canal {channel_id} no encontrado")
|
|
return
|
|
|
|
formatted_message = message_template.format(
|
|
user_mention=member.mention,
|
|
username=member.name,
|
|
server_name=member.guild.name,
|
|
member_count=member.guild.member_count
|
|
)
|
|
|
|
text_escaped = html.escape(formatted_message)
|
|
mention_pattern = re.compile(r'<@!?(\d+)>|<@&(\d+)>|<#(\d+)>')
|
|
welcome_mentions_map = {}
|
|
|
|
def replace_mention(match):
|
|
placeholder = f"【M{len(welcome_mentions_map)}】"
|
|
welcome_mentions_map[placeholder] = html.unescape(match.group(0))
|
|
return placeholder
|
|
|
|
text_to_translate = mention_pattern.sub(replace_mention, text_escaped)
|
|
|
|
welcome_msg = await channel.send(formatted_message)
|
|
print(f"[WELCOME] Mensaje enviado: {welcome_msg.id}")
|
|
|
|
save_message(
|
|
welcome_msg.id,
|
|
member.guild.id,
|
|
member.id,
|
|
text_to_translate,
|
|
welcome_mentions_map,
|
|
'discord'
|
|
)
|
|
|
|
view = WelcomeTranslationView(member.guild.id)
|
|
await welcome_msg.edit(content=formatted_message, view=view)
|
|
print(f"[WELCOME] Vista de traducción añadida")
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
print(f"[WELCOME] Error: {e}")
|
|
traceback.print_exc()
|
|
|
|
def get_active_langs_for_guild(guild_id):
|
|
from botdiscord.translate import get_reverse_mapping
|
|
active = get_active_languages(guild_id)
|
|
if not active:
|
|
active = get_bot_languages("discord")
|
|
if not active:
|
|
active = [lang["code"] for lang in get_languages()]
|
|
|
|
reverse_mapping = get_reverse_mapping("discord")
|
|
# Devolvemos códigos para filtrar la vista persistente
|
|
return active
|
|
|
|
@bot.event
|
|
async def on_message(message):
|
|
if message.author.bot: return
|
|
|
|
# Verificar si el canal está habilitado para traducción
|
|
channel_enabled = is_channel_enabled(message.channel.id)
|
|
print(f"[Bot] Mensaje en canal {message.channel.id} ({message.channel.name}) - Habilitado: {channel_enabled} - Timestamp: {time.strftime('%H:%M:%S')}")
|
|
|
|
if not channel_enabled:
|
|
print(f"[Bot] Ignorando mensaje en canal desactivado: {message.channel.name}")
|
|
return
|
|
|
|
text_content = message.content.strip()
|
|
if not text_content: return
|
|
|
|
# Filtros de stickers/emojis/etc
|
|
if message.stickers or text_content.startswith('https://tenor.com/') or \
|
|
re.fullmatch(r'<(a?):[a-zA-Z0-9_]+:[0-9]+>', text_content):
|
|
return
|
|
|
|
if len(text_content) < 2: return
|
|
|
|
active_codes = get_active_langs_for_guild(message.guild.id)
|
|
if not active_codes: return
|
|
|
|
text_escaped = html.escape(message.content)
|
|
mention_pattern = re.compile(r'<@!?(\d+)>|<@&(\d+)>|<#(\d+)>')
|
|
mentions_map = {}
|
|
|
|
def replace_mention(match):
|
|
placeholder = f"【M{len(mentions_map)}】"
|
|
mentions_map[placeholder] = html.unescape(match.group(0))
|
|
return placeholder
|
|
|
|
text_to_translate = mention_pattern.sub(replace_mention, text_escaped)
|
|
save_message(message.id, message.guild.id, message.author.id, text_to_translate, mentions_map, 'discord')
|
|
|
|
view = discord.ui.View(timeout=None)
|
|
name_to_code = get_name_to_code("discord")
|
|
flag_mapping = get_flag_mapping("discord")
|
|
code_to_name = {v: k for k, v in name_to_code.items()}
|
|
|
|
for code in active_codes:
|
|
name = code_to_name.get(code, code)
|
|
flag = flag_mapping.get(code, "")
|
|
view.add_item(TranslationButton(name, code, flag))
|
|
|
|
try:
|
|
await message.reply(content=None, view=view, mention_author=False)
|
|
except Exception as e:
|
|
print(f"Error enviando reply: {e}")
|
|
|
|
@bot.tree.command(name="configurar", description="Configura los idiomas de traducción")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def configurar(interaction: discord.Interaction):
|
|
view = ConfigView(interaction.guild_id, "discord")
|
|
await interaction.response.send_message("Selecciona idiomas habilitados:", view=view, ephemeral=True)
|
|
|
|
@bot.tree.command(name="rag", description="Busca información sobre Last War en la base de conocimientos")
|
|
async def rag_command(interaction: discord.Interaction, *, pregunta: str):
|
|
await interaction.response.defer(ephemeral=True)
|
|
|
|
from botdiscord.groq_agent import chat_with_rag
|
|
result = await chat_with_rag(pregunta)
|
|
|
|
response = result.get("response", "Sin respuesta")
|
|
sources = result.get("sources", [])
|
|
|
|
embed = discord.Embed(title="🔍 Last War Knowledge", description=response, color=discord.Color.blue())
|
|
|
|
if sources:
|
|
source_text = "\n".join([f"• {s.get('title', 'N/A')}" for s in sources[:3]])
|
|
embed.add_field(name="Sources", value=source_text, inline=False)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=True)
|
|
|
|
def run_discord_bot():
|
|
token = get_discord_token()
|
|
bot.run(token)
|
|
|
|
if __name__ == "__main__":
|
|
run_discord_bot()
|