import os import sys import glob import re import asyncio sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI, HTTPException, status from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi import Request from fastapi.responses import RedirectResponse from pydantic import BaseModel from dotenv import load_dotenv from passlib.hash import pbkdf2_sha256 as hasher from botdiscord.config import load_config, get_web_config, get_libretranslate_url, get_db_type, get_groq_config # Asegurar que las variables de entorno se carguen correctamente load_dotenv() load_config() # Cargamos configuración inmediatamente from botdiscord.database import ( init_db, get_ui_translation, save_ui_translation, get_admins, get_admin_by_username, add_admin, delete_admin, _normalize_text, get_translation_stats ) init_db() # Aseguramos que las tablas existan antes de que FastAPI atienda peticiones from botdiscord.translate import translate_text app = FastAPI(title="Panel de Configuración - Bots de Traducción") SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) templates = Jinja2Templates(directory=os.path.join(SCRIPT_DIR, "panel", "templates")) from utils.logger import panel_logger as log from utils.cache import cache_get, cache_set, cache_increment # Caché de memoria RAM como fallback si Redis no está disponible _ui_memory_cache = {} # Filtro de traducción para Jinja2 (usa Redis → RAM → DB → LibreTranslate en cascada) def translate_filter(text, lang="es"): if lang == "es" or not text: return text # NORMALIZACIÓN inmediata text = _normalize_text(text) if not text: return "" cache_key = f"ui:{lang}:{text}" _BAD = "mantenimiento" # Patron de respuesta de error a ignorar # 1. Redis (compartido y persistente entre reinicios) redis_val = cache_get(cache_key) if redis_val and _BAD not in redis_val: _ui_memory_cache[cache_key] = redis_val return redis_val elif redis_val and _BAD in redis_val: # Valor contaminado: eliminarlo del caché from utils.cache import cache_delete cache_delete(cache_key) # 2. RAM local (ultra rápida, temporal) ram_val = _ui_memory_cache.get(cache_key) if ram_val and _BAD not in ram_val: return ram_val elif cache_key in _ui_memory_cache: del _ui_memory_cache[cache_key] # 3. Base de datos MySQL cached = get_ui_translation(text, lang) if cached and _BAD not in cached: _ui_memory_cache[cache_key] = cached cache_set(cache_key, cached, ttl=604800) # 7 días return cached # 4. LibreTranslate (sincrónico, solo si no está en ningún caché) from botdiscord.translate import translate_text_sync translated = translate_text_sync(text, lang) if translated and translated != text and _BAD not in translated: save_ui_translation(text, lang, translated) _ui_memory_cache[cache_key] = translated cache_set(cache_key, translated, ttl=604800) # 7 días return translated return text # Fallback: devolver el texto original si todo falla templates.env.filters["translate"] = translate_filter _warmed_languages = set() async def pre_warm_ui_translations(lang: str): if lang == "es" or lang in _warmed_languages: return _warmed_languages.add(lang) print(f"[Panel] Pre-calentando traducciones para idioma UI: {lang}...") strings_to_translate = set() for f in glob.glob(os.path.join(os.path.dirname(__file__), "templates", "*.html")): try: with open(f, "r", encoding="utf-8") as fd: content = fd.read() strings_to_translate.update(re.findall(r'\"([^\"]+)\"\s*\|\s*translate', content)) strings_to_translate.update(re.findall(r'\'([^\']+)\'\s*\|\s*translate', content)) except Exception: pass missing_strings = [] for text in strings_to_translate: norm_text = _normalize_text(text) if not norm_text: continue cache_key = f"{lang}:{norm_text}" if cache_key in _ui_memory_cache: continue cached = get_ui_translation(norm_text, lang) if cached: _ui_memory_cache[cache_key] = cached else: missing_strings.append(norm_text) if not missing_strings: print(f"[Panel] Pre-calentamiento completo: 0 cadenas faltantes para {lang}") return print(f"[Panel] Falta traducir {len(missing_strings)} cadenas UI al {lang}. Empezando concurrencia...") from botdiscord.translate import translate_text async def _fetch_and_save(text): translated = await translate_text(text, lang) if translated and translated != text: save_ui_translation(text, lang, translated) _ui_memory_cache[f"{lang}:{text}"] = translated tasks = [_fetch_and_save(t) for t in missing_strings] await asyncio.gather(*tasks) print(f"[Panel] Pre-calentamiento terminado para {lang}.") @app.middleware("http") async def translation_pre_warm_middleware(request: Request, call_next): # Detectamos el idioma que va a usar la página para asegurarnos de que esté cargado if request.url.path.startswith("/static") or request.url.path.startswith("/api"): return await call_next(request) lang = request.cookies.get("panel_lang") if not lang: accept_lang = request.headers.get("accept-language", "es") lang = accept_lang.split(",")[0].split("-")[0] if lang and len(lang) == 2: await pre_warm_ui_translations(lang) return await call_next(request) @app.get("/set-lang/{lang}") async def set_lang(lang: str, request: Request): # Validamos que el idioma sea de 2 letras if len(lang) != 2: return RedirectResponse(url="/dashboard") # Ejecutamos el calentamiento asíncrono para que la redirección sea instantánea await pre_warm_ui_translations(lang) response = RedirectResponse(url=request.headers.get("referer", "/dashboard")) response.set_cookie(key="panel_lang", value=lang, max_age=31536000) # 1 año return response def get_panel_lang(request: Request): lang = request.cookies.get("panel_lang") if not lang: accept_lang = request.headers.get("accept-language", "es") lang = accept_lang.split(",")[0].split("-")[0] return lang class LoginForm(BaseModel): username: str password: str def get_config(): cfg = load_config() return { "discord": {"token": cfg.get("discord", {}).get("token", "")}, "telegram": {"token": cfg.get("telegram", {}).get("token", "")}, "libretranslate": {"url": cfg.get("libretranslate", {}).get("url", "")}, "web": cfg.get("web", {}), "database": cfg.get("database", {}), "languages": cfg.get("languages", {}) } def verify_admin(username: str, password: str) -> bool: # 1. Primero intentamos con el SuperAdmin del .env (backup) web_config = get_web_config() if username == web_config.get("admin_username", "") and \ password == web_config.get("admin_password", ""): return True # 2. Si no es el SuperAdmin, buscamos en la base de datos MySQL try: admin = get_admin_by_username(username) if admin and hasher.verify(password, admin['password_hash']): return True except Exception as e: print(f"Error verifying admin: {e}") return False @app.get("/") async def root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/login") async def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login(request: Request): form = await request.form() username = form.get("username", "") password = form.get("password", "") if verify_admin(username, password): response = RedirectResponse(url="/dashboard", status_code=status.HTTP_303_SEE_OTHER) response.set_cookie(key="auth", value="ok", httponly=True) response.set_cookie(key="username", value=username, httponly=True) return response return templates.TemplateResponse("login.html", { "request": request, "error": "Credenciales incorrectas" }) @app.get("/dashboard") async def dashboard(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") config = get_config() return templates.TemplateResponse("dashboard.html", { "request": request, "config": config, "username": username }) @app.get("/config") async def config_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") config = get_config() return templates.TemplateResponse("config.html", { "request": request, "config": config, "username": username }) @app.get("/languages") async def languages_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_available_languages, get_bot_languages available = get_available_languages() discord_langs = get_bot_languages("discord") telegram_langs = get_bot_languages("telegram") return templates.TemplateResponse("languages.html", { "request": request, "available_languages": available, "discord_languages": discord_langs, "telegram_languages": telegram_langs, "libretranslate_url": get_libretranslate_url() }) @app.post("/languages/sync") async def sync_languages(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) import aiohttp url = get_libretranslate_url() if not url: return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) base_url = url.rstrip("/translate").rstrip("/translate/").rstrip("/") if base_url.endswith("/translate"): base_url = base_url[:-10] base_url = base_url.rstrip("/") try: async with aiohttp.ClientSession() as session: async with session.get(f"{base_url}/languages", timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status == 200: languages = await resp.json() from botdiscord.database import get_available_languages, set_available_languages existing = {lang["code"]: lang.get("flag", "") for lang in get_available_languages()} for lang in languages: lang["flag"] = existing.get(lang["code"], "") set_available_languages(languages) return RedirectResponse(url="/languages?synced=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error syncing languages: {e}") return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/languages/bot") async def update_bot_languages(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() bot_type = form.get("bot_type", "discord") lang_codes = form.getlist("lang_codes") from botdiscord.database import set_bot_languages set_bot_languages(bot_type, lang_codes) return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/languages/flags") async def update_language_flags(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) from botdiscord.database import get_available_languages, set_available_languages available = get_available_languages() form = await request.form() for lang in available: flag_key = f"flag_{lang['code']}" lang['flag'] = form.get(flag_key, "") set_available_languages(available) return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/logout") async def logout(): response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER) response.delete_cookie("auth") response.delete_cookie("username") return response @app.get("/admins") async def admins_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") admins = get_admins() return templates.TemplateResponse("admins.html", { "request": request, "admins": admins }) @app.post("/admins/add") async def add_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() username = form.get("username", "") password = form.get("password", "") if not username or not password: return RedirectResponse(url="/admins?error=missing_fields", status_code=status.HTTP_303_SEE_OTHER) try: password_hash = hasher.hash(password) add_admin(username, password_hash) return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"CRITICAL ERROR adding admin: {e}") # Redirigimos con error a la misma página return RedirectResponse(url="/admins?error=" + str(e), status_code=status.HTTP_303_SEE_OTHER) @app.post("/admins/delete") async def delete_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() admin_id = form.get("admin_id") if admin_id: try: delete_admin(int(admin_id)) except Exception as e: print(f"Error deleting admin: {e}") return RedirectResponse(url="/admins", status_code=status.HTTP_303_SEE_OTHER) @app.post("/admins/update") async def update_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() admin_id = form.get("admin_id") new_password = form.get("new_password") if admin_id and new_password: from botdiscord.database import update_admin_password password_hash = hasher.hash(new_password) try: update_admin_password(int(admin_id), password_hash) except Exception as e: print(f"Error updating admin password: {e}") return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/welcome") async def welcome_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_all_welcome_configs configs = get_all_welcome_configs() config_dict = {c['guild_id']: c for c in configs} if configs else {} selected_guild = request.query_params.get("guild") selected_cfg = None if selected_guild and selected_guild.isdigit(): selected_cfg = config_dict.get(int(selected_guild)) success = request.query_params.get("success") == "1" error = request.query_params.get("error") == "1" return templates.TemplateResponse("welcome.html", { "request": request, "configs": config_dict, "config": config_dict, "selected_guild": int(selected_guild) if selected_guild and selected_guild.isdigit() else None, "selected_cfg": selected_cfg, "success": success, "error": error, "new_form": request.query_params.get("new") == "1" }) @app.post("/welcome/save") async def save_welcome(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() guild_id = form.get("guild_id") channel_id = form.get("channel_id") message_content = form.get("message_content") enabled = form.get("enabled") == "1" if not guild_id or not channel_id or not message_content: return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import save_welcome_message save_welcome_message( int(guild_id), int(channel_id), message_content, enabled ) return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error saving welcome message: {e}") return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/welcome/delete") async def delete_welcome(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() guild_id = form.get("guild_id") if guild_id: try: from botdiscord.database import delete_welcome_message delete_welcome_message(int(guild_id)) return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error deleting welcome message: {e}") return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) return RedirectResponse(url="/welcome", status_code=status.HTTP_303_SEE_OTHER) @app.get("/discord-channels") async def discord_channels_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_discord_servers, get_discord_channels servers = get_discord_servers() channels = get_discord_channels() # Agrupar canales por servidor servers_with_channels = {} for server in servers: server_id = server['server_id'] servers_with_channels[server_id] = { 'server_info': server, 'channels': [ch for ch in channels if ch['server_id'] == server_id] } success = request.query_params.get("success") == "1" error = request.query_params.get("error") == "1" synced = request.query_params.get("synced") == "1" return templates.TemplateResponse("discord_channels.html", { "request": request, "servers_with_channels": servers_with_channels, "success": success, "error": error, "synced": synced }) @app.post("/discord-channels/sync") async def sync_discord_servers(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) try: print("[Panel] Iniciando sincronización de Discord...") # Diagnóstico de variables de entorno print("[Panel] Variables de entorno disponibles:") discord_env_vars = {k: v for k, v in os.environ.items() if 'DISCORD' in k or 'TOKEN' in k} for key, value in discord_env_vars.items(): print(f" {key}: {'***' if 'TOKEN' in key else value}") # Importar y usar el módulo de sincronización from botdiscord.server_sync import sync_discord_servers_from_api result = await sync_discord_servers_from_api() print(f"[Panel] Resultado de sincronización: {result}") if result: return RedirectResponse(url="/discord-channels?synced=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"[Panel] Error en sincronización: {e}") import traceback traceback.print_exc() return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/toggle") async def toggle_discord_channel(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() channel_id = form.get("channel_id") is_active = form.get("is_active") == "true" if not channel_id: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import toggle_channel_status from botdiscord.reload_marker import set_reload_marker toggle_channel_status(int(channel_id), is_active) set_reload_marker() return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error toggling channel: {e}") return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/delete-server") async def delete_discord_server(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() server_id = form.get("server_id") if not server_id: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import delete_discord_server from botdiscord.reload_marker import set_reload_marker delete_discord_server(int(server_id)) set_reload_marker() return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error deleting server: {e}") return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/reload-bot") async def reload_discord_bot(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) try: print("[Panel] Enviando señal de recarga al bot de Discord...") # Importar y ejecutar la función de recarga from botdiscord.bot_reload import reload_bot_config result = await reload_bot_config() if result: return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"[Panel] Error recargando bot: {e}") import traceback traceback.print_exc() return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/bulk-toggle") async def bulk_toggle_discord_channels(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() channel_ids = form.getlist("channel_ids") bulk_action = form.get("bulk_action") if not channel_ids or not bulk_action: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import toggle_channel_status is_active = (bulk_action == "activate") success_count = 0 for channel_id_str in channel_ids: try: channel_id = int(channel_id_str) toggle_channel_status(channel_id, is_active) success_count += 1 except ValueError: continue print(f"[Panel] Bulk toggle: {success_count} canales {'activados' if is_active else 'desactivados'}") from botdiscord.reload_marker import set_reload_marker set_reload_marker() return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"[Panel] Error en bulk toggle: {e}") import traceback traceback.print_exc() return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/diagnosis") async def diagnosis_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") # Obtener variables de entorno relevantes env_vars = { 'DISCORD_TOKEN': os.getenv('DISCORD_TOKEN'), 'TELEGRAM_TOKEN': os.getenv('TELEGRAM_TOKEN'), 'LIBRETRANSLATE_URL': os.getenv('LIBRETRANSLATE_URL'), 'DB_TYPE': os.getenv('DB_TYPE'), 'DB_HOST': os.getenv('DB_HOST'), 'DB_PORT': os.getenv('DB_PORT'), 'DB_NAME': os.getenv('DB_NAME'), 'DB_USER': os.getenv('DB_USER') } # Obtener configuración cargada config = get_config() return templates.TemplateResponse("diagnosis.html", { "request": request, "env_vars": env_vars, "config": config }) @app.get("/api/stats") async def get_stats(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) stats = get_translation_stats() return stats @app.get("/metrics") async def metrics_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") return templates.TemplateResponse("metrics.html", { "request": request, "username": username }) @app.get("/groq") async def groq_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_bot_config, set_bot_config groq_config = get_groq_config() # Default prompt default_prompt = """Eres el General Reserves, comandante del ejército de Last War: Survival Game. IDIOMA - IMPORTANTE: 1. Detecta el idioma de la PREGUNTA del usuario 2. Si NO es inglés, tradúcela al inglés ANTES de consultar el RAG 3. Cuando recibas la respuesta del RAG, tradúcela al MISMO IDIOMA de la pregunta original 4. RESPONDE SIEMPRE en el mismo idioma que te habló el usuario SALUDOS: Saluda como "¡A la orden, recruit! 🎖️" o "¡Reporting for duty!" FORMATO DE RESPUESTA: - Primero saluda al usuario - Da la información encontrada - NUNCA repitas información varias veces - Sé conciso RESTRICCIONES: 1. SOLO responde sobre Last War: Survival Game 2. NUNCA inventes información 3. Si no hay datos en el RAG, responde con humor gentil: "¡Mi radar no detectó eso, recruit! 🤔" Usa el sistema RAG para buscar información.""" # Always use default prompt (user can edit and save custom one) groq_config["system_prompt"] = default_prompt # Check if user has a custom prompt saved saved_prompt = get_bot_config("groq_system_prompt") if saved_prompt: groq_config["system_prompt"] = saved_prompt groq_config["has_custom_prompt"] = True else: groq_config["has_custom_prompt"] = False groq_models = [ {"id": "llama-3.3-70b-versatile", "name": "Llama 3.3 70B (Versatile)"}, {"id": "llama-3.1-70b-versatile", "name": "Llama 3.1 70B (Versatile)"}, {"id": "llama-3.1-8b-instant", "name": "Llama 3.1 8B (Instant)"}, {"id": "mixtral-8x7b-32768", "name": "Mixtral 8x7B"}, {"id": "gemma2-9b-it", "name": "Gemma 2 9B"}, ] return templates.TemplateResponse("groq.html", { "request": request, "groq_config": groq_config, "groq_models": groq_models, "is_admin": request.cookies.get("username") == "nickpons666" }) @app.post("/groq/save") async def save_groq_config(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() api_key = form.get("api_key", "") model = form.get("model", "llama-3.3-70b-versatile") rag_url = form.get("rag_url", "http://localhost:8004") system_prompt = form.get("system_prompt", "") config = load_config() if "groq" not in config: config["groq"] = {} config["groq"]["api_key"] = api_key config["groq"]["model"] = model config["groq"]["rag_url"] = rag_url import yaml config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.yaml") with open(config_path, "w") as f: yaml.dump(config, f, default_flow_style=False, allow_unicode=True) if system_prompt: from botdiscord.database import set_bot_config set_bot_config("groq_system_prompt", system_prompt) from botdiscord.groq_agent import reload_config reload_config() return RedirectResponse(url="/groq?saved=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/groq/reset-prompt") async def reset_groq_prompt(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) from botdiscord.database import set_bot_config set_bot_config("groq_system_prompt", "") from botdiscord.groq_agent import reload_config reload_config() return RedirectResponse(url="/groq?reset=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/groq/test") async def test_groq_agent(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() test_question = form.get("test_question", "") if not test_question: return RedirectResponse(url="/groq?error=no_question", status_code=status.HTTP_303_SEE_OTHER) from botdiscord.groq_agent import chat_with_rag try: result = await chat_with_rag(test_question) response = result.get("response", "Sin respuesta") sources = result.get("sources", []) rag_result = result.get("rag_result", {}) rag_answer = rag_result.get("answer", "") if rag_result else "" return templates.TemplateResponse("groq.html", { "request": request, "groq_config": get_groq_config(), "groq_models": [ {"id": "llama-3.3-70b-versatile", "name": "Llama 3.3 70B (Versatile)"}, {"id": "llama-3.1-70b-versatile", "name": "Llama 3.1 70B (Versatile)"}, {"id": "llama-3.1-8b-instant", "name": "Llama 3.1 8B (Instant)"}, {"id": "mixtral-8x7b-32768", "name": "Mixtral 8x7B"}, {"id": "gemma2-9b-it", "name": "Gemma 2 9B"}, ], "is_admin": request.cookies.get("username") == "nickpons666", "test_result": { "question": test_question, "response": response, "sources": sources, "rag_answer": rag_answer } }) except Exception as e: return templates.TemplateResponse("groq.html", { "request": request, "groq_config": get_groq_config(), "groq_models": [ {"id": "llama-3.3-70b-versatile", "name": "Llama 3.3 70B (Versatile)"}, {"id": "llama-3.1-70b-versatile", "name": "Llama 3.1 70B (Versatile)"}, {"id": "llama-3.1-8b-instant", "name": "Llama 3.1 8B (Instant)"}, {"id": "mixtral-8x7b-32768", "name": "Mixtral 8x7B"}, {"id": "gemma2-9b-it", "name": "Gemma 2 9B"}, ], "is_admin": request.cookies.get("username") == "nickpons666", "test_error": str(e) }) if __name__ == "__main__": import uvicorn web_config = get_web_config() uvicorn.run(app, host=web_config.get("host", "0.0.0.0"), port=web_config.get("port", 8000))