import os import sys import glob import re import asyncio sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI, HTTPException, status from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi import Request from fastapi.responses import RedirectResponse from pydantic import BaseModel from dotenv import load_dotenv from passlib.hash import pbkdf2_sha256 as hasher from botdiscord.config import load_config, get_web_config, get_libretranslate_url, get_db_type # Asegurar que las variables de entorno se carguen correctamente load_dotenv() load_config() # Cargamos configuración inmediatamente from botdiscord.database import ( init_db, get_ui_translation, save_ui_translation, get_admins, get_admin_by_username, add_admin, delete_admin, _normalize_text ) init_db() # Aseguramos que las tablas existan antes de que FastAPI atienda peticiones from botdiscord.translate import translate_text app = FastAPI(title="Panel de Configuración - Bots de Traducción") SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) templates = Jinja2Templates(directory=os.path.join(SCRIPT_DIR, "panel", "templates")) from utils.logger import panel_logger as log from utils.cache import cache_get, cache_set, cache_increment # Caché de memoria RAM como fallback si Redis no está disponible _ui_memory_cache = {} # Filtro de traducción para Jinja2 (usa Redis → RAM → DB → LibreTranslate en cascada) def translate_filter(text, lang="es"): if lang == "es" or not text: return text # NORMALIZACIÓN inmediata text = _normalize_text(text) if not text: return "" cache_key = f"ui:{lang}:{text}" _BAD = "mantenimiento" # Patron de respuesta de error a ignorar # 1. Redis (compartido y persistente entre reinicios) redis_val = cache_get(cache_key) if redis_val and _BAD not in redis_val: _ui_memory_cache[cache_key] = redis_val return redis_val elif redis_val and _BAD in redis_val: # Valor contaminado: eliminarlo del caché from utils.cache import cache_delete cache_delete(cache_key) # 2. RAM local (ultra rápida, temporal) ram_val = _ui_memory_cache.get(cache_key) if ram_val and _BAD not in ram_val: return ram_val elif cache_key in _ui_memory_cache: del _ui_memory_cache[cache_key] # 3. Base de datos MySQL cached = get_ui_translation(text, lang) if cached and _BAD not in cached: _ui_memory_cache[cache_key] = cached cache_set(cache_key, cached, ttl=604800) # 7 días return cached # 4. LibreTranslate (sincrónico, solo si no está en ningún caché) from botdiscord.translate import translate_text_sync translated = translate_text_sync(text, lang) if translated and translated != text and _BAD not in translated: save_ui_translation(text, lang, translated) _ui_memory_cache[cache_key] = translated cache_set(cache_key, translated, ttl=604800) # 7 días return translated return text # Fallback: devolver el texto original si todo falla templates.env.filters["translate"] = translate_filter _warmed_languages = set() async def pre_warm_ui_translations(lang: str): if lang == "es" or lang in _warmed_languages: return _warmed_languages.add(lang) print(f"[Panel] Pre-calentando traducciones para idioma UI: {lang}...") strings_to_translate = set() for f in glob.glob(os.path.join(os.path.dirname(__file__), "templates", "*.html")): try: with open(f, "r", encoding="utf-8") as fd: content = fd.read() strings_to_translate.update(re.findall(r'\"([^\"]+)\"\s*\|\s*translate', content)) strings_to_translate.update(re.findall(r'\'([^\']+)\'\s*\|\s*translate', content)) except Exception: pass missing_strings = [] for text in strings_to_translate: norm_text = _normalize_text(text) if not norm_text: continue cache_key = f"{lang}:{norm_text}" if cache_key in _ui_memory_cache: continue cached = get_ui_translation(norm_text, lang) if cached: _ui_memory_cache[cache_key] = cached else: missing_strings.append(norm_text) if not missing_strings: print(f"[Panel] Pre-calentamiento completo: 0 cadenas faltantes para {lang}") return print(f"[Panel] Falta traducir {len(missing_strings)} cadenas UI al {lang}. Empezando concurrencia...") from botdiscord.translate import translate_text async def _fetch_and_save(text): translated = await translate_text(text, lang) if translated and translated != text: save_ui_translation(text, lang, translated) _ui_memory_cache[f"{lang}:{text}"] = translated tasks = [_fetch_and_save(t) for t in missing_strings] await asyncio.gather(*tasks) print(f"[Panel] Pre-calentamiento terminado para {lang}.") @app.middleware("http") async def translation_pre_warm_middleware(request: Request, call_next): # Detectamos el idioma que va a usar la página para asegurarnos de que esté cargado if request.url.path.startswith("/static") or request.url.path.startswith("/api"): return await call_next(request) lang = request.cookies.get("panel_lang") if not lang: accept_lang = request.headers.get("accept-language", "es") lang = accept_lang.split(",")[0].split("-")[0] if lang and len(lang) == 2: await pre_warm_ui_translations(lang) return await call_next(request) @app.get("/set-lang/{lang}") async def set_lang(lang: str, request: Request): # Validamos que el idioma sea de 2 letras if len(lang) != 2: return RedirectResponse(url="/dashboard") # Ejecutamos el calentamiento asíncrono para que la redirección sea instantánea await pre_warm_ui_translations(lang) response = RedirectResponse(url=request.headers.get("referer", "/dashboard")) response.set_cookie(key="panel_lang", value=lang, max_age=31536000) # 1 año return response def get_panel_lang(request: Request): lang = request.cookies.get("panel_lang") if not lang: accept_lang = request.headers.get("accept-language", "es") lang = accept_lang.split(",")[0].split("-")[0] return lang class LoginForm(BaseModel): username: str password: str def get_config(): cfg = load_config() return { "discord": {"token": cfg.get("discord", {}).get("token", "")}, "telegram": {"token": cfg.get("telegram", {}).get("token", "")}, "libretranslate": {"url": cfg.get("libretranslate", {}).get("url", "")}, "web": cfg.get("web", {}), "database": cfg.get("database", {}), "languages": cfg.get("languages", {}) } def verify_admin(username: str, password: str) -> bool: # 1. Primero intentamos con el SuperAdmin del .env (backup) web_config = get_web_config() if username == web_config.get("admin_username", "") and \ password == web_config.get("admin_password", ""): return True # 2. Si no es el SuperAdmin, buscamos en la base de datos MySQL try: admin = get_admin_by_username(username) if admin and hasher.verify(password, admin['password_hash']): return True except Exception as e: print(f"Error verifying admin: {e}") return False @app.get("/") async def root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/login") async def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login(request: Request): form = await request.form() username = form.get("username", "") password = form.get("password", "") if verify_admin(username, password): response = RedirectResponse(url="/dashboard", status_code=status.HTTP_303_SEE_OTHER) response.set_cookie(key="auth", value="ok", httponly=True) response.set_cookie(key="username", value=username, httponly=True) return response return templates.TemplateResponse("login.html", { "request": request, "error": "Credenciales incorrectas" }) @app.get("/dashboard") async def dashboard(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") config = get_config() return templates.TemplateResponse("dashboard.html", { "request": request, "config": config, "username": username }) @app.get("/config") async def config_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") config = get_config() return templates.TemplateResponse("config.html", { "request": request, "config": config, "username": username }) @app.get("/languages") async def languages_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_available_languages, get_bot_languages available = get_available_languages() discord_langs = get_bot_languages("discord") telegram_langs = get_bot_languages("telegram") return templates.TemplateResponse("languages.html", { "request": request, "available_languages": available, "discord_languages": discord_langs, "telegram_languages": telegram_langs, "libretranslate_url": get_libretranslate_url() }) @app.post("/languages/sync") async def sync_languages(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) import aiohttp url = get_libretranslate_url() if not url: return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) base_url = url.rstrip("/translate").rstrip("/translate/").rstrip("/") if base_url.endswith("/translate"): base_url = base_url[:-10] base_url = base_url.rstrip("/") try: async with aiohttp.ClientSession() as session: async with session.get(f"{base_url}/languages", timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status == 200: languages = await resp.json() from botdiscord.database import get_available_languages, set_available_languages existing = {lang["code"]: lang.get("flag", "") for lang in get_available_languages()} for lang in languages: lang["flag"] = existing.get(lang["code"], "") set_available_languages(languages) return RedirectResponse(url="/languages?synced=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error syncing languages: {e}") return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/languages/bot") async def update_bot_languages(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() bot_type = form.get("bot_type", "discord") lang_codes = form.getlist("lang_codes") from botdiscord.database import set_bot_languages set_bot_languages(bot_type, lang_codes) return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/languages/flags") async def update_language_flags(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) from botdiscord.database import get_available_languages, set_available_languages available = get_available_languages() form = await request.form() for lang in available: flag_key = f"flag_{lang['code']}" lang['flag'] = form.get(flag_key, "") set_available_languages(available) return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/logout") async def logout(): response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER) response.delete_cookie("auth") response.delete_cookie("username") return response @app.get("/admins") async def admins_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") admins = get_admins() return templates.TemplateResponse("admins.html", { "request": request, "admins": admins }) @app.post("/admins/add") async def add_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() username = form.get("username", "") password = form.get("password", "") if not username or not password: return RedirectResponse(url="/admins?error=missing_fields", status_code=status.HTTP_303_SEE_OTHER) try: password_hash = hasher.hash(password) add_admin(username, password_hash) return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"CRITICAL ERROR adding admin: {e}") # Redirigimos con error a la misma página return RedirectResponse(url="/admins?error=" + str(e), status_code=status.HTTP_303_SEE_OTHER) @app.post("/admins/delete") async def delete_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() admin_id = form.get("admin_id") if admin_id: try: delete_admin(int(admin_id)) except Exception as e: print(f"Error deleting admin: {e}") return RedirectResponse(url="/admins", status_code=status.HTTP_303_SEE_OTHER) @app.post("/admins/update") async def update_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() admin_id = form.get("admin_id") new_password = form.get("new_password") if admin_id and new_password: from botdiscord.database import update_admin_password password_hash = hasher.hash(new_password) try: update_admin_password(int(admin_id), password_hash) except Exception as e: print(f"Error updating admin password: {e}") return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/welcome") async def welcome_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_all_welcome_configs configs = get_all_welcome_configs() config_dict = {c['guild_id']: c for c in configs} if configs else {} selected_guild = request.query_params.get("guild") selected_cfg = None if selected_guild and selected_guild.isdigit(): selected_cfg = config_dict.get(int(selected_guild)) success = request.query_params.get("success") == "1" error = request.query_params.get("error") == "1" return templates.TemplateResponse("welcome.html", { "request": request, "configs": config_dict, "config": config_dict, "selected_guild": int(selected_guild) if selected_guild and selected_guild.isdigit() else None, "selected_cfg": selected_cfg, "success": success, "error": error, "new_form": request.query_params.get("new") == "1" }) @app.post("/welcome/save") async def save_welcome(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() guild_id = form.get("guild_id") channel_id = form.get("channel_id") message_content = form.get("message_content") enabled = form.get("enabled") == "1" if not guild_id or not channel_id or not message_content: return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import save_welcome_message save_welcome_message( int(guild_id), int(channel_id), message_content, enabled ) return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error saving welcome message: {e}") return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/welcome/delete") async def delete_welcome(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() guild_id = form.get("guild_id") if guild_id: try: from botdiscord.database import delete_welcome_message delete_welcome_message(int(guild_id)) return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error deleting welcome message: {e}") return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) return RedirectResponse(url="/welcome", status_code=status.HTTP_303_SEE_OTHER) @app.get("/discord-channels") async def discord_channels_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_discord_servers, get_discord_channels servers = get_discord_servers() channels = get_discord_channels() # Agrupar canales por servidor servers_with_channels = {} for server in servers: server_id = server['server_id'] servers_with_channels[server_id] = { 'server_info': server, 'channels': [ch for ch in channels if ch['server_id'] == server_id] } success = request.query_params.get("success") == "1" error = request.query_params.get("error") == "1" synced = request.query_params.get("synced") == "1" return templates.TemplateResponse("discord_channels.html", { "request": request, "servers_with_channels": servers_with_channels, "success": success, "error": error, "synced": synced }) @app.post("/discord-channels/sync") async def sync_discord_servers(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) try: print("[Panel] Iniciando sincronización de Discord...") # Diagnóstico de variables de entorno print("[Panel] Variables de entorno disponibles:") discord_env_vars = {k: v for k, v in os.environ.items() if 'DISCORD' in k or 'TOKEN' in k} for key, value in discord_env_vars.items(): print(f" {key}: {'***' if 'TOKEN' in key else value}") # Importar y usar el módulo de sincronización from botdiscord.server_sync import sync_discord_servers_from_api result = await sync_discord_servers_from_api() print(f"[Panel] Resultado de sincronización: {result}") if result: return RedirectResponse(url="/discord-channels?synced=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"[Panel] Error en sincronización: {e}") import traceback traceback.print_exc() return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/toggle") async def toggle_discord_channel(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() channel_id = form.get("channel_id") is_active = form.get("is_active") == "true" if not channel_id: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import toggle_channel_status from botdiscord.reload_marker import set_reload_marker toggle_channel_status(int(channel_id), is_active) set_reload_marker() return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error toggling channel: {e}") return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/delete-server") async def delete_discord_server(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() server_id = form.get("server_id") if not server_id: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import delete_discord_server from botdiscord.reload_marker import set_reload_marker delete_discord_server(int(server_id)) set_reload_marker() return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error deleting server: {e}") return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/reload-bot") async def reload_discord_bot(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) try: print("[Panel] Enviando señal de recarga al bot de Discord...") # Importar y ejecutar la función de recarga from botdiscord.bot_reload import reload_bot_config result = await reload_bot_config() if result: return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"[Panel] Error recargando bot: {e}") import traceback traceback.print_exc() return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/discord-channels/bulk-toggle") async def bulk_toggle_discord_channels(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() channel_ids = form.getlist("channel_ids") bulk_action = form.get("bulk_action") if not channel_ids or not bulk_action: return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import toggle_channel_status is_active = (bulk_action == "activate") success_count = 0 for channel_id_str in channel_ids: try: channel_id = int(channel_id_str) toggle_channel_status(channel_id, is_active) success_count += 1 except ValueError: continue print(f"[Panel] Bulk toggle: {success_count} canales {'activados' if is_active else 'desactivados'}") from botdiscord.reload_marker import set_reload_marker set_reload_marker() return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"[Panel] Error en bulk toggle: {e}") import traceback traceback.print_exc() return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/diagnosis") async def diagnosis_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") # Obtener variables de entorno relevantes env_vars = { 'DISCORD_TOKEN': os.getenv('DISCORD_TOKEN'), 'TELEGRAM_TOKEN': os.getenv('TELEGRAM_TOKEN'), 'LIBRETRANSLATE_URL': os.getenv('LIBRETRANSLATE_URL'), 'DB_TYPE': os.getenv('DB_TYPE'), 'DB_HOST': os.getenv('DB_HOST'), 'DB_PORT': os.getenv('DB_PORT'), 'DB_NAME': os.getenv('DB_NAME'), 'DB_USER': os.getenv('DB_USER') } # Obtener configuración cargada config = get_config() return templates.TemplateResponse("diagnosis.html", { "request": request, "env_vars": env_vars, "config": config }) if __name__ == "__main__": import uvicorn web_config = get_web_config() uvicorn.run(app, host=web_config.get("host", "0.0.0.0"), port=web_config.get("port", 8000))