import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI, HTTPException, status from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi import Request from fastapi.responses import RedirectResponse from pydantic import BaseModel from passlib.hash import pbkdf2_sha256 as hasher from botdiscord.config import load_config, get_web_config, get_libretranslate_url, get_db_type load_config() # Cargamos configuración inmediatamente from botdiscord.database import ( init_db, get_ui_translation, save_ui_translation, get_admins, get_admin_by_username, add_admin, delete_admin, _normalize_text ) init_db() # Aseguramos que las tablas existan antes de que FastAPI atienda peticiones from botdiscord.translate import translate_text app = FastAPI(title="Panel de Configuración - Bots de Traducción") SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) templates = Jinja2Templates(directory=os.path.join(SCRIPT_DIR, "panel", "templates")) # Caché de memoria para traducciones UI (evita 50 queries SQL por página) _ui_memory_cache = {} # Filtro de traducción para Jinja2 def translate_filter(text, lang="es"): if lang == "es" or not text: return text # NORMALIZACIÓN inmediata text = _normalize_text(text) if not text: return "" # 1. Buscamos en caché de MEMORIA (RAM) - Ultra rápido cache_key = f"{lang}:{text}" if cache_key in _ui_memory_cache: return _ui_memory_cache[cache_key] # 2. Buscamos en caché de la base de datos (acceso rápido) cached = get_ui_translation(text, lang) if cached: _ui_memory_cache[cache_key] = cached return cached # 3. Si no está en ningún caché, traducimos de forma síncrona from botdiscord.translate import translate_text_sync translated = translate_text_sync(text, lang) if translated and translated != text: save_ui_translation(text, lang, translated) _ui_memory_cache[cache_key] = translated return translated templates.env.filters["translate"] = translate_filter @app.get("/set-lang/{lang}") async def set_lang(lang: str, request: Request): # Validamos que el idioma sea de 2 letras if len(lang) != 2: return RedirectResponse(url="/dashboard") response = RedirectResponse(url=request.headers.get("referer", "/dashboard")) response.set_cookie(key="panel_lang", value=lang, max_age=31536000) # 1 año return response def get_panel_lang(request: Request): lang = request.cookies.get("panel_lang") if not lang: accept_lang = request.headers.get("accept-language", "es") lang = accept_lang.split(",")[0].split("-")[0] return lang class LoginForm(BaseModel): username: str password: str def get_config(): cfg = load_config() return { "discord": {"token": cfg.get("discord", {}).get("token", "")}, "telegram": {"token": cfg.get("telegram", {}).get("token", "")}, "libretranslate": {"url": cfg.get("libretranslate", {}).get("url", "")}, "web": cfg.get("web", {}), "database": cfg.get("database", {}), "languages": cfg.get("languages", {}) } def verify_admin(username: str, password: str) -> bool: # 1. Primero intentamos con el SuperAdmin del .env (backup) web_config = get_web_config() if username == web_config.get("admin_username", "") and \ password == web_config.get("admin_password", ""): return True # 2. Si no es el SuperAdmin, buscamos en la base de datos MySQL try: admin = get_admin_by_username(username) if admin and hasher.verify(password, admin['password_hash']): return True except Exception as e: print(f"Error verifying admin: {e}") return False @app.get("/") async def root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/login") async def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login(request: Request): form = await request.form() username = form.get("username", "") password = form.get("password", "") if verify_admin(username, password): response = RedirectResponse(url="/dashboard", status_code=status.HTTP_303_SEE_OTHER) response.set_cookie(key="auth", value="ok", httponly=True) response.set_cookie(key="username", value=username, httponly=True) return response return templates.TemplateResponse("login.html", { "request": request, "error": "Credenciales incorrectas" }) @app.get("/dashboard") async def dashboard(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") config = get_config() return templates.TemplateResponse("dashboard.html", { "request": request, "config": config, "username": username }) @app.get("/config") async def config_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") username = request.cookies.get("username", "") config = get_config() return templates.TemplateResponse("config.html", { "request": request, "config": config, "username": username }) @app.get("/languages") async def languages_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_available_languages, get_bot_languages available = get_available_languages() discord_langs = get_bot_languages("discord") telegram_langs = get_bot_languages("telegram") return templates.TemplateResponse("languages.html", { "request": request, "available_languages": available, "discord_languages": discord_langs, "telegram_languages": telegram_langs, "libretranslate_url": get_libretranslate_url() }) @app.post("/languages/sync") async def sync_languages(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) import aiohttp url = get_libretranslate_url() if not url: return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) base_url = url.rstrip("/translate").rstrip("/translate/").rstrip("/") if base_url.endswith("/translate"): base_url = base_url[:-10] base_url = base_url.rstrip("/") try: async with aiohttp.ClientSession() as session: async with session.get(f"{base_url}/languages", timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status == 200: languages = await resp.json() from botdiscord.database import get_available_languages, set_available_languages existing = {lang["code"]: lang.get("flag", "") for lang in get_available_languages()} for lang in languages: lang["flag"] = existing.get(lang["code"], "") set_available_languages(languages) return RedirectResponse(url="/languages?synced=1", status_code=status.HTTP_303_SEE_OTHER) else: return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error syncing languages: {e}") return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/languages/bot") async def update_bot_languages(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() bot_type = form.get("bot_type", "discord") lang_codes = form.getlist("lang_codes") from botdiscord.database import set_bot_languages set_bot_languages(bot_type, lang_codes) return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/languages/flags") async def update_language_flags(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) from botdiscord.database import get_available_languages, set_available_languages available = get_available_languages() form = await request.form() for lang in available: flag_key = f"flag_{lang['code']}" lang['flag'] = form.get(flag_key, "") set_available_languages(available) return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/logout") async def logout(): response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER) response.delete_cookie("auth") response.delete_cookie("username") return response @app.get("/admins") async def admins_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") admins = get_admins() return templates.TemplateResponse("admins.html", { "request": request, "admins": admins }) @app.post("/admins/add") async def add_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() username = form.get("username", "") password = form.get("password", "") if not username or not password: return RedirectResponse(url="/admins?error=missing_fields", status_code=status.HTTP_303_SEE_OTHER) try: password_hash = hasher.hash(password) add_admin(username, password_hash) return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"CRITICAL ERROR adding admin: {e}") # Redirigimos con error a la misma página return RedirectResponse(url="/admins?error=" + str(e), status_code=status.HTTP_303_SEE_OTHER) @app.post("/admins/delete") async def delete_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() admin_id = form.get("admin_id") if admin_id: try: delete_admin(int(admin_id)) except Exception as e: print(f"Error deleting admin: {e}") return RedirectResponse(url="/admins", status_code=status.HTTP_303_SEE_OTHER) @app.post("/admins/update") async def update_admin_post(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() admin_id = form.get("admin_id") new_password = form.get("new_password") if admin_id and new_password: from botdiscord.database import update_admin_password password_hash = hasher.hash(new_password) try: update_admin_password(int(admin_id), password_hash) except Exception as e: print(f"Error updating admin password: {e}") return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER) @app.get("/welcome") async def welcome_page(request: Request): if request.cookies.get("auth") != "ok": return RedirectResponse(url="/login") from botdiscord.database import get_all_welcome_configs configs = get_all_welcome_configs() config_dict = {c['guild_id']: c for c in configs} if configs else {} selected_guild = request.query_params.get("guild") selected_cfg = None if selected_guild and selected_guild.isdigit(): selected_cfg = config_dict.get(int(selected_guild)) success = request.query_params.get("success") == "1" error = request.query_params.get("error") == "1" return templates.TemplateResponse("welcome.html", { "request": request, "configs": config_dict, "config": config_dict, "selected_guild": int(selected_guild) if selected_guild and selected_guild.isdigit() else None, "selected_cfg": selected_cfg, "success": success, "error": error, "new_form": request.query_params.get("new") == "1" }) @app.post("/welcome/save") async def save_welcome(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() guild_id = form.get("guild_id") channel_id = form.get("channel_id") message_content = form.get("message_content") enabled = form.get("enabled") == "1" if not guild_id or not channel_id or not message_content: return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) try: from botdiscord.database import save_welcome_message save_welcome_message( int(guild_id), int(channel_id), message_content, enabled ) return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error saving welcome message: {e}") return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) @app.post("/welcome/delete") async def delete_welcome(request: Request): if request.cookies.get("auth") != "ok": raise HTTPException(status_code=401) form = await request.form() guild_id = form.get("guild_id") if guild_id: try: from botdiscord.database import delete_welcome_message delete_welcome_message(int(guild_id)) return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: print(f"Error deleting welcome message: {e}") return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER) return RedirectResponse(url="/welcome", status_code=status.HTTP_303_SEE_OTHER) if __name__ == "__main__": import uvicorn web_config = get_web_config() uvicorn.run(app, host=web_config.get("host", "0.0.0.0"), port=web_config.get("port", 8000))