Files
traduccion_bots/panel/main.py
nickpons666 8398e988b0 Feat: Agregar agente Groq con integración RAG
- Nuevo módulo groq_agent.py para consultas a la API de Groq
- Panel de administración en /groq para configurar API key, modelo y prompt
- Comando /rag en Discord y Telegram para consultar el RAG
- Sistema de prompt personalizable guardado en base de datos
- Soporte para variables de entorno en Docker
- Fix: starlette version para evitar bug con Jinja2
2026-03-26 21:23:19 -06:00

866 lines
32 KiB
Python

import os
import sys
import glob
import re
import asyncio
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastapi import FastAPI, HTTPException, status
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi import Request
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from dotenv import load_dotenv
from passlib.hash import pbkdf2_sha256 as hasher
from botdiscord.config import load_config, get_web_config, get_libretranslate_url, get_db_type, get_groq_config
# Asegurar que las variables de entorno se carguen correctamente
load_dotenv()
load_config() # Cargamos configuración inmediatamente
from botdiscord.database import (
init_db, get_ui_translation, save_ui_translation,
get_admins, get_admin_by_username, add_admin, delete_admin,
_normalize_text, get_translation_stats
)
init_db() # Aseguramos que las tablas existan antes de que FastAPI atienda peticiones
from botdiscord.translate import translate_text
app = FastAPI(title="Panel de Configuración - Bots de Traducción")
SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
templates = Jinja2Templates(directory=os.path.join(SCRIPT_DIR, "panel", "templates"))
from utils.logger import panel_logger as log
from utils.cache import cache_get, cache_set, cache_increment
# Caché de memoria RAM como fallback si Redis no está disponible
_ui_memory_cache = {}
# Filtro de traducción para Jinja2 (usa Redis → RAM → DB → LibreTranslate en cascada)
def translate_filter(text, lang="es"):
if lang == "es" or not text:
return text
# NORMALIZACIÓN inmediata
text = _normalize_text(text)
if not text: return ""
cache_key = f"ui:{lang}:{text}"
_BAD = "mantenimiento" # Patron de respuesta de error a ignorar
# 1. Redis (compartido y persistente entre reinicios)
redis_val = cache_get(cache_key)
if redis_val and _BAD not in redis_val:
_ui_memory_cache[cache_key] = redis_val
return redis_val
elif redis_val and _BAD in redis_val:
# Valor contaminado: eliminarlo del caché
from utils.cache import cache_delete
cache_delete(cache_key)
# 2. RAM local (ultra rápida, temporal)
ram_val = _ui_memory_cache.get(cache_key)
if ram_val and _BAD not in ram_val:
return ram_val
elif cache_key in _ui_memory_cache:
del _ui_memory_cache[cache_key]
# 3. Base de datos MySQL
cached = get_ui_translation(text, lang)
if cached and _BAD not in cached:
_ui_memory_cache[cache_key] = cached
cache_set(cache_key, cached, ttl=604800) # 7 días
return cached
# 4. LibreTranslate (sincrónico, solo si no está en ningún caché)
from botdiscord.translate import translate_text_sync
translated = translate_text_sync(text, lang)
if translated and translated != text and _BAD not in translated:
save_ui_translation(text, lang, translated)
_ui_memory_cache[cache_key] = translated
cache_set(cache_key, translated, ttl=604800) # 7 días
return translated
return text # Fallback: devolver el texto original si todo falla
templates.env.filters["translate"] = translate_filter
_warmed_languages = set()
async def pre_warm_ui_translations(lang: str):
if lang == "es" or lang in _warmed_languages:
return
_warmed_languages.add(lang)
print(f"[Panel] Pre-calentando traducciones para idioma UI: {lang}...")
strings_to_translate = set()
for f in glob.glob(os.path.join(os.path.dirname(__file__), "templates", "*.html")):
try:
with open(f, "r", encoding="utf-8") as fd:
content = fd.read()
strings_to_translate.update(re.findall(r'\"([^\"]+)\"\s*\|\s*translate', content))
strings_to_translate.update(re.findall(r'\'([^\']+)\'\s*\|\s*translate', content))
except Exception:
pass
missing_strings = []
for text in strings_to_translate:
norm_text = _normalize_text(text)
if not norm_text: continue
cache_key = f"{lang}:{norm_text}"
if cache_key in _ui_memory_cache: continue
cached = get_ui_translation(norm_text, lang)
if cached:
_ui_memory_cache[cache_key] = cached
else:
missing_strings.append(norm_text)
if not missing_strings:
print(f"[Panel] Pre-calentamiento completo: 0 cadenas faltantes para {lang}")
return
print(f"[Panel] Falta traducir {len(missing_strings)} cadenas UI al {lang}. Empezando concurrencia...")
from botdiscord.translate import translate_text
async def _fetch_and_save(text):
translated = await translate_text(text, lang)
if translated and translated != text:
save_ui_translation(text, lang, translated)
_ui_memory_cache[f"{lang}:{text}"] = translated
tasks = [_fetch_and_save(t) for t in missing_strings]
await asyncio.gather(*tasks)
print(f"[Panel] Pre-calentamiento terminado para {lang}.")
@app.middleware("http")
async def translation_pre_warm_middleware(request: Request, call_next):
# Detectamos el idioma que va a usar la página para asegurarnos de que esté cargado
if request.url.path.startswith("/static") or request.url.path.startswith("/api"):
return await call_next(request)
lang = request.cookies.get("panel_lang")
if not lang:
accept_lang = request.headers.get("accept-language", "es")
lang = accept_lang.split(",")[0].split("-")[0]
if lang and len(lang) == 2:
await pre_warm_ui_translations(lang)
return await call_next(request)
@app.get("/set-lang/{lang}")
async def set_lang(lang: str, request: Request):
# Validamos que el idioma sea de 2 letras
if len(lang) != 2:
return RedirectResponse(url="/dashboard")
# Ejecutamos el calentamiento asíncrono para que la redirección sea instantánea
await pre_warm_ui_translations(lang)
response = RedirectResponse(url=request.headers.get("referer", "/dashboard"))
response.set_cookie(key="panel_lang", value=lang, max_age=31536000) # 1 año
return response
def get_panel_lang(request: Request):
lang = request.cookies.get("panel_lang")
if not lang:
accept_lang = request.headers.get("accept-language", "es")
lang = accept_lang.split(",")[0].split("-")[0]
return lang
class LoginForm(BaseModel):
username: str
password: str
def get_config():
cfg = load_config()
return {
"discord": {"token": cfg.get("discord", {}).get("token", "")},
"telegram": {"token": cfg.get("telegram", {}).get("token", "")},
"libretranslate": {"url": cfg.get("libretranslate", {}).get("url", "")},
"web": cfg.get("web", {}),
"database": cfg.get("database", {}),
"languages": cfg.get("languages", {})
}
def verify_admin(username: str, password: str) -> bool:
# 1. Primero intentamos con el SuperAdmin del .env (backup)
web_config = get_web_config()
if username == web_config.get("admin_username", "") and \
password == web_config.get("admin_password", ""):
return True
# 2. Si no es el SuperAdmin, buscamos en la base de datos MySQL
try:
admin = get_admin_by_username(username)
if admin and hasher.verify(password, admin['password_hash']):
return True
except Exception as e:
print(f"Error verifying admin: {e}")
return False
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/login")
async def login_page(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
async def login(request: Request):
form = await request.form()
username = form.get("username", "")
password = form.get("password", "")
if verify_admin(username, password):
response = RedirectResponse(url="/dashboard", status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(key="auth", value="ok", httponly=True)
response.set_cookie(key="username", value=username, httponly=True)
return response
return templates.TemplateResponse("login.html", {
"request": request,
"error": "Credenciales incorrectas"
})
@app.get("/dashboard")
async def dashboard(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
username = request.cookies.get("username", "")
config = get_config()
return templates.TemplateResponse("dashboard.html", {
"request": request,
"config": config,
"username": username
})
@app.get("/config")
async def config_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
username = request.cookies.get("username", "")
config = get_config()
return templates.TemplateResponse("config.html", {
"request": request,
"config": config,
"username": username
})
@app.get("/languages")
async def languages_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_available_languages, get_bot_languages
available = get_available_languages()
discord_langs = get_bot_languages("discord")
telegram_langs = get_bot_languages("telegram")
return templates.TemplateResponse("languages.html", {
"request": request,
"available_languages": available,
"discord_languages": discord_langs,
"telegram_languages": telegram_langs,
"libretranslate_url": get_libretranslate_url()
})
@app.post("/languages/sync")
async def sync_languages(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
import aiohttp
url = get_libretranslate_url()
if not url:
return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER)
base_url = url.rstrip("/translate").rstrip("/translate/").rstrip("/")
if base_url.endswith("/translate"):
base_url = base_url[:-10]
base_url = base_url.rstrip("/")
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/languages", timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
languages = await resp.json()
from botdiscord.database import get_available_languages, set_available_languages
existing = {lang["code"]: lang.get("flag", "") for lang in get_available_languages()}
for lang in languages:
lang["flag"] = existing.get(lang["code"], "")
set_available_languages(languages)
return RedirectResponse(url="/languages?synced=1", status_code=status.HTTP_303_SEE_OTHER)
else:
return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error syncing languages: {e}")
return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/languages/bot")
async def update_bot_languages(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
bot_type = form.get("bot_type", "discord")
lang_codes = form.getlist("lang_codes")
from botdiscord.database import set_bot_languages
set_bot_languages(bot_type, lang_codes)
return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/languages/flags")
async def update_language_flags(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
from botdiscord.database import get_available_languages, set_available_languages
available = get_available_languages()
form = await request.form()
for lang in available:
flag_key = f"flag_{lang['code']}"
lang['flag'] = form.get(flag_key, "")
set_available_languages(available)
return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/logout")
async def logout():
response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie("auth")
response.delete_cookie("username")
return response
@app.get("/admins")
async def admins_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
admins = get_admins()
return templates.TemplateResponse("admins.html", {
"request": request,
"admins": admins
})
@app.post("/admins/add")
async def add_admin_post(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
username = form.get("username", "")
password = form.get("password", "")
if not username or not password:
return RedirectResponse(url="/admins?error=missing_fields", status_code=status.HTTP_303_SEE_OTHER)
try:
password_hash = hasher.hash(password)
add_admin(username, password_hash)
return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"CRITICAL ERROR adding admin: {e}")
# Redirigimos con error a la misma página
return RedirectResponse(url="/admins?error=" + str(e), status_code=status.HTTP_303_SEE_OTHER)
@app.post("/admins/delete")
async def delete_admin_post(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
admin_id = form.get("admin_id")
if admin_id:
try:
delete_admin(int(admin_id))
except Exception as e:
print(f"Error deleting admin: {e}")
return RedirectResponse(url="/admins", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/admins/update")
async def update_admin_post(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
admin_id = form.get("admin_id")
new_password = form.get("new_password")
if admin_id and new_password:
from botdiscord.database import update_admin_password
password_hash = hasher.hash(new_password)
try:
update_admin_password(int(admin_id), password_hash)
except Exception as e:
print(f"Error updating admin password: {e}")
return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/welcome")
async def welcome_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_all_welcome_configs
configs = get_all_welcome_configs()
config_dict = {c['guild_id']: c for c in configs} if configs else {}
selected_guild = request.query_params.get("guild")
selected_cfg = None
if selected_guild and selected_guild.isdigit():
selected_cfg = config_dict.get(int(selected_guild))
success = request.query_params.get("success") == "1"
error = request.query_params.get("error") == "1"
return templates.TemplateResponse("welcome.html", {
"request": request,
"configs": config_dict,
"config": config_dict,
"selected_guild": int(selected_guild) if selected_guild and selected_guild.isdigit() else None,
"selected_cfg": selected_cfg,
"success": success,
"error": error,
"new_form": request.query_params.get("new") == "1"
})
@app.post("/welcome/save")
async def save_welcome(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
guild_id = form.get("guild_id")
channel_id = form.get("channel_id")
message_content = form.get("message_content")
enabled = form.get("enabled") == "1"
if not guild_id or not channel_id or not message_content:
return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import save_welcome_message
save_welcome_message(
int(guild_id),
int(channel_id),
message_content,
enabled
)
return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error saving welcome message: {e}")
return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/welcome/delete")
async def delete_welcome(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
guild_id = form.get("guild_id")
if guild_id:
try:
from botdiscord.database import delete_welcome_message
delete_welcome_message(int(guild_id))
return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error deleting welcome message: {e}")
return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER)
return RedirectResponse(url="/welcome", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/discord-channels")
async def discord_channels_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_discord_servers, get_discord_channels
servers = get_discord_servers()
channels = get_discord_channels()
# Agrupar canales por servidor
servers_with_channels = {}
for server in servers:
server_id = server['server_id']
servers_with_channels[server_id] = {
'server_info': server,
'channels': [ch for ch in channels if ch['server_id'] == server_id]
}
success = request.query_params.get("success") == "1"
error = request.query_params.get("error") == "1"
synced = request.query_params.get("synced") == "1"
return templates.TemplateResponse("discord_channels.html", {
"request": request,
"servers_with_channels": servers_with_channels,
"success": success,
"error": error,
"synced": synced
})
@app.post("/discord-channels/sync")
async def sync_discord_servers(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
try:
print("[Panel] Iniciando sincronización de Discord...")
# Diagnóstico de variables de entorno
print("[Panel] Variables de entorno disponibles:")
discord_env_vars = {k: v for k, v in os.environ.items() if 'DISCORD' in k or 'TOKEN' in k}
for key, value in discord_env_vars.items():
print(f" {key}: {'***' if 'TOKEN' in key else value}")
# Importar y usar el módulo de sincronización
from botdiscord.server_sync import sync_discord_servers_from_api
result = await sync_discord_servers_from_api()
print(f"[Panel] Resultado de sincronización: {result}")
if result:
return RedirectResponse(url="/discord-channels?synced=1", status_code=status.HTTP_303_SEE_OTHER)
else:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"[Panel] Error en sincronización: {e}")
import traceback
traceback.print_exc()
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/toggle")
async def toggle_discord_channel(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
channel_id = form.get("channel_id")
is_active = form.get("is_active") == "true"
if not channel_id:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import toggle_channel_status
from botdiscord.reload_marker import set_reload_marker
toggle_channel_status(int(channel_id), is_active)
set_reload_marker()
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error toggling channel: {e}")
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/delete-server")
async def delete_discord_server(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
server_id = form.get("server_id")
if not server_id:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import delete_discord_server
from botdiscord.reload_marker import set_reload_marker
delete_discord_server(int(server_id))
set_reload_marker()
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error deleting server: {e}")
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/reload-bot")
async def reload_discord_bot(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
try:
print("[Panel] Enviando señal de recarga al bot de Discord...")
# Importar y ejecutar la función de recarga
from botdiscord.bot_reload import reload_bot_config
result = await reload_bot_config()
if result:
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
else:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"[Panel] Error recargando bot: {e}")
import traceback
traceback.print_exc()
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/bulk-toggle")
async def bulk_toggle_discord_channels(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
channel_ids = form.getlist("channel_ids")
bulk_action = form.get("bulk_action")
if not channel_ids or not bulk_action:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import toggle_channel_status
is_active = (bulk_action == "activate")
success_count = 0
for channel_id_str in channel_ids:
try:
channel_id = int(channel_id_str)
toggle_channel_status(channel_id, is_active)
success_count += 1
except ValueError:
continue
print(f"[Panel] Bulk toggle: {success_count} canales {'activados' if is_active else 'desactivados'}")
from botdiscord.reload_marker import set_reload_marker
set_reload_marker()
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"[Panel] Error en bulk toggle: {e}")
import traceback
traceback.print_exc()
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/diagnosis")
async def diagnosis_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
# Obtener variables de entorno relevantes
env_vars = {
'DISCORD_TOKEN': os.getenv('DISCORD_TOKEN'),
'TELEGRAM_TOKEN': os.getenv('TELEGRAM_TOKEN'),
'LIBRETRANSLATE_URL': os.getenv('LIBRETRANSLATE_URL'),
'DB_TYPE': os.getenv('DB_TYPE'),
'DB_HOST': os.getenv('DB_HOST'),
'DB_PORT': os.getenv('DB_PORT'),
'DB_NAME': os.getenv('DB_NAME'),
'DB_USER': os.getenv('DB_USER')
}
# Obtener configuración cargada
config = get_config()
return templates.TemplateResponse("diagnosis.html", {
"request": request,
"env_vars": env_vars,
"config": config
})
@app.get("/api/stats")
async def get_stats(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
stats = get_translation_stats()
return stats
@app.get("/metrics")
async def metrics_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
username = request.cookies.get("username", "")
return templates.TemplateResponse("metrics.html", {
"request": request,
"username": username
})
@app.get("/groq")
async def groq_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_bot_config, set_bot_config
groq_config = get_groq_config()
# Default prompt
default_prompt = """Eres el General Reserves, comandante del ejército de Last War: Survival Game.
IDIOMA - IMPORTANTE:
1. Detecta el idioma de la PREGUNTA del usuario
2. Si NO es inglés, tradúcela al inglés ANTES de consultar el RAG
3. Cuando recibas la respuesta del RAG, tradúcela al MISMO IDIOMA de la pregunta original
4. RESPONDE SIEMPRE en el mismo idioma que te habló el usuario
SALUDOS: Saluda como "¡A la orden, recruit! 🎖️" o "¡Reporting for duty!"
FORMATO DE RESPUESTA:
- Primero saluda al usuario
- Da la información encontrada
- NUNCA repitas información varias veces
- Sé conciso
RESTRICCIONES:
1. SOLO responde sobre Last War: Survival Game
2. NUNCA inventes información
3. Si no hay datos en el RAG, responde con humor gentil: "¡Mi radar no detectó eso, recruit! 🤔"
Usa el sistema RAG para buscar información."""
# Always use default prompt (user can edit and save custom one)
groq_config["system_prompt"] = default_prompt
# Check if user has a custom prompt saved
saved_prompt = get_bot_config("groq_system_prompt")
if saved_prompt:
groq_config["system_prompt"] = saved_prompt
groq_config["has_custom_prompt"] = True
else:
groq_config["has_custom_prompt"] = False
groq_models = [
{"id": "llama-3.3-70b-versatile", "name": "Llama 3.3 70B (Versatile)"},
{"id": "llama-3.1-70b-versatile", "name": "Llama 3.1 70B (Versatile)"},
{"id": "llama-3.1-8b-instant", "name": "Llama 3.1 8B (Instant)"},
{"id": "mixtral-8x7b-32768", "name": "Mixtral 8x7B"},
{"id": "gemma2-9b-it", "name": "Gemma 2 9B"},
]
return templates.TemplateResponse("groq.html", {
"request": request,
"groq_config": groq_config,
"groq_models": groq_models,
"is_admin": request.cookies.get("username") == "nickpons666"
})
@app.post("/groq/save")
async def save_groq_config(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
api_key = form.get("api_key", "")
model = form.get("model", "llama-3.3-70b-versatile")
rag_url = form.get("rag_url", "http://localhost:8004")
system_prompt = form.get("system_prompt", "")
config = load_config()
if "groq" not in config:
config["groq"] = {}
config["groq"]["api_key"] = api_key
config["groq"]["model"] = model
config["groq"]["rag_url"] = rag_url
import yaml
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.yaml")
with open(config_path, "w") as f:
yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
if system_prompt:
from botdiscord.database import set_bot_config
set_bot_config("groq_system_prompt", system_prompt)
from botdiscord.groq_agent import reload_config
reload_config()
return RedirectResponse(url="/groq?saved=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/groq/reset-prompt")
async def reset_groq_prompt(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
from botdiscord.database import set_bot_config
set_bot_config("groq_system_prompt", "")
from botdiscord.groq_agent import reload_config
reload_config()
return RedirectResponse(url="/groq?reset=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/groq/test")
async def test_groq_agent(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
test_question = form.get("test_question", "")
if not test_question:
return RedirectResponse(url="/groq?error=no_question", status_code=status.HTTP_303_SEE_OTHER)
from botdiscord.groq_agent import chat_with_rag
try:
result = await chat_with_rag(test_question)
response = result.get("response", "Sin respuesta")
sources = result.get("sources", [])
rag_result = result.get("rag_result", {})
rag_answer = rag_result.get("answer", "") if rag_result else ""
return templates.TemplateResponse("groq.html", {
"request": request,
"groq_config": get_groq_config(),
"groq_models": [
{"id": "llama-3.3-70b-versatile", "name": "Llama 3.3 70B (Versatile)"},
{"id": "llama-3.1-70b-versatile", "name": "Llama 3.1 70B (Versatile)"},
{"id": "llama-3.1-8b-instant", "name": "Llama 3.1 8B (Instant)"},
{"id": "mixtral-8x7b-32768", "name": "Mixtral 8x7B"},
{"id": "gemma2-9b-it", "name": "Gemma 2 9B"},
],
"is_admin": request.cookies.get("username") == "nickpons666",
"test_result": {
"question": test_question,
"response": response,
"sources": sources,
"rag_answer": rag_answer
}
})
except Exception as e:
return templates.TemplateResponse("groq.html", {
"request": request,
"groq_config": get_groq_config(),
"groq_models": [
{"id": "llama-3.3-70b-versatile", "name": "Llama 3.3 70B (Versatile)"},
{"id": "llama-3.1-70b-versatile", "name": "Llama 3.1 70B (Versatile)"},
{"id": "llama-3.1-8b-instant", "name": "Llama 3.1 8B (Instant)"},
{"id": "mixtral-8x7b-32768", "name": "Mixtral 8x7B"},
{"id": "gemma2-9b-it", "name": "Gemma 2 9B"},
],
"is_admin": request.cookies.get("username") == "nickpons666",
"test_error": str(e)
})
if __name__ == "__main__":
import uvicorn
web_config = get_web_config()
uvicorn.run(app, host=web_config.get("host", "0.0.0.0"), port=web_config.get("port", 8000))