Files
traduccion_bots/panel/main.py
nickpons666 77024d443f Feat: Añadir panel de métricas con estadísticas por idioma, plataforma y servidor
- Crear página dedicada /metrics con gráficos usando Chart.js
- Implementar función get_translation_stats() en database.py
- Añadir endpoint /api/stats en panel/main.py
- Mostrar métricas de traducciones por idioma, plataforma y servidor Discord
- Agregar tarjeta de acceso rápido a Métricas en el Dashboard
- Actualizar action_plan_pro.md con el progreso completado
2026-03-21 15:15:38 -06:00

710 lines
26 KiB
Python

import os
import sys
import glob
import re
import asyncio
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastapi import FastAPI, HTTPException, status
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi import Request
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from dotenv import load_dotenv
from passlib.hash import pbkdf2_sha256 as hasher
from botdiscord.config import load_config, get_web_config, get_libretranslate_url, get_db_type
# Asegurar que las variables de entorno se carguen correctamente
load_dotenv()
load_config() # Cargamos configuración inmediatamente
from botdiscord.database import (
init_db, get_ui_translation, save_ui_translation,
get_admins, get_admin_by_username, add_admin, delete_admin,
_normalize_text, get_translation_stats
)
init_db() # Aseguramos que las tablas existan antes de que FastAPI atienda peticiones
from botdiscord.translate import translate_text
app = FastAPI(title="Panel de Configuración - Bots de Traducción")
SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
templates = Jinja2Templates(directory=os.path.join(SCRIPT_DIR, "panel", "templates"))
from utils.logger import panel_logger as log
from utils.cache import cache_get, cache_set, cache_increment
# Caché de memoria RAM como fallback si Redis no está disponible
_ui_memory_cache = {}
# Filtro de traducción para Jinja2 (usa Redis → RAM → DB → LibreTranslate en cascada)
def translate_filter(text, lang="es"):
if lang == "es" or not text:
return text
# NORMALIZACIÓN inmediata
text = _normalize_text(text)
if not text: return ""
cache_key = f"ui:{lang}:{text}"
_BAD = "mantenimiento" # Patron de respuesta de error a ignorar
# 1. Redis (compartido y persistente entre reinicios)
redis_val = cache_get(cache_key)
if redis_val and _BAD not in redis_val:
_ui_memory_cache[cache_key] = redis_val
return redis_val
elif redis_val and _BAD in redis_val:
# Valor contaminado: eliminarlo del caché
from utils.cache import cache_delete
cache_delete(cache_key)
# 2. RAM local (ultra rápida, temporal)
ram_val = _ui_memory_cache.get(cache_key)
if ram_val and _BAD not in ram_val:
return ram_val
elif cache_key in _ui_memory_cache:
del _ui_memory_cache[cache_key]
# 3. Base de datos MySQL
cached = get_ui_translation(text, lang)
if cached and _BAD not in cached:
_ui_memory_cache[cache_key] = cached
cache_set(cache_key, cached, ttl=604800) # 7 días
return cached
# 4. LibreTranslate (sincrónico, solo si no está en ningún caché)
from botdiscord.translate import translate_text_sync
translated = translate_text_sync(text, lang)
if translated and translated != text and _BAD not in translated:
save_ui_translation(text, lang, translated)
_ui_memory_cache[cache_key] = translated
cache_set(cache_key, translated, ttl=604800) # 7 días
return translated
return text # Fallback: devolver el texto original si todo falla
templates.env.filters["translate"] = translate_filter
_warmed_languages = set()
async def pre_warm_ui_translations(lang: str):
if lang == "es" or lang in _warmed_languages:
return
_warmed_languages.add(lang)
print(f"[Panel] Pre-calentando traducciones para idioma UI: {lang}...")
strings_to_translate = set()
for f in glob.glob(os.path.join(os.path.dirname(__file__), "templates", "*.html")):
try:
with open(f, "r", encoding="utf-8") as fd:
content = fd.read()
strings_to_translate.update(re.findall(r'\"([^\"]+)\"\s*\|\s*translate', content))
strings_to_translate.update(re.findall(r'\'([^\']+)\'\s*\|\s*translate', content))
except Exception:
pass
missing_strings = []
for text in strings_to_translate:
norm_text = _normalize_text(text)
if not norm_text: continue
cache_key = f"{lang}:{norm_text}"
if cache_key in _ui_memory_cache: continue
cached = get_ui_translation(norm_text, lang)
if cached:
_ui_memory_cache[cache_key] = cached
else:
missing_strings.append(norm_text)
if not missing_strings:
print(f"[Panel] Pre-calentamiento completo: 0 cadenas faltantes para {lang}")
return
print(f"[Panel] Falta traducir {len(missing_strings)} cadenas UI al {lang}. Empezando concurrencia...")
from botdiscord.translate import translate_text
async def _fetch_and_save(text):
translated = await translate_text(text, lang)
if translated and translated != text:
save_ui_translation(text, lang, translated)
_ui_memory_cache[f"{lang}:{text}"] = translated
tasks = [_fetch_and_save(t) for t in missing_strings]
await asyncio.gather(*tasks)
print(f"[Panel] Pre-calentamiento terminado para {lang}.")
@app.middleware("http")
async def translation_pre_warm_middleware(request: Request, call_next):
# Detectamos el idioma que va a usar la página para asegurarnos de que esté cargado
if request.url.path.startswith("/static") or request.url.path.startswith("/api"):
return await call_next(request)
lang = request.cookies.get("panel_lang")
if not lang:
accept_lang = request.headers.get("accept-language", "es")
lang = accept_lang.split(",")[0].split("-")[0]
if lang and len(lang) == 2:
await pre_warm_ui_translations(lang)
return await call_next(request)
@app.get("/set-lang/{lang}")
async def set_lang(lang: str, request: Request):
# Validamos que el idioma sea de 2 letras
if len(lang) != 2:
return RedirectResponse(url="/dashboard")
# Ejecutamos el calentamiento asíncrono para que la redirección sea instantánea
await pre_warm_ui_translations(lang)
response = RedirectResponse(url=request.headers.get("referer", "/dashboard"))
response.set_cookie(key="panel_lang", value=lang, max_age=31536000) # 1 año
return response
def get_panel_lang(request: Request):
lang = request.cookies.get("panel_lang")
if not lang:
accept_lang = request.headers.get("accept-language", "es")
lang = accept_lang.split(",")[0].split("-")[0]
return lang
class LoginForm(BaseModel):
username: str
password: str
def get_config():
cfg = load_config()
return {
"discord": {"token": cfg.get("discord", {}).get("token", "")},
"telegram": {"token": cfg.get("telegram", {}).get("token", "")},
"libretranslate": {"url": cfg.get("libretranslate", {}).get("url", "")},
"web": cfg.get("web", {}),
"database": cfg.get("database", {}),
"languages": cfg.get("languages", {})
}
def verify_admin(username: str, password: str) -> bool:
# 1. Primero intentamos con el SuperAdmin del .env (backup)
web_config = get_web_config()
if username == web_config.get("admin_username", "") and \
password == web_config.get("admin_password", ""):
return True
# 2. Si no es el SuperAdmin, buscamos en la base de datos MySQL
try:
admin = get_admin_by_username(username)
if admin and hasher.verify(password, admin['password_hash']):
return True
except Exception as e:
print(f"Error verifying admin: {e}")
return False
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/login")
async def login_page(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
async def login(request: Request):
form = await request.form()
username = form.get("username", "")
password = form.get("password", "")
if verify_admin(username, password):
response = RedirectResponse(url="/dashboard", status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(key="auth", value="ok", httponly=True)
response.set_cookie(key="username", value=username, httponly=True)
return response
return templates.TemplateResponse("login.html", {
"request": request,
"error": "Credenciales incorrectas"
})
@app.get("/dashboard")
async def dashboard(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
username = request.cookies.get("username", "")
config = get_config()
return templates.TemplateResponse("dashboard.html", {
"request": request,
"config": config,
"username": username
})
@app.get("/config")
async def config_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
username = request.cookies.get("username", "")
config = get_config()
return templates.TemplateResponse("config.html", {
"request": request,
"config": config,
"username": username
})
@app.get("/languages")
async def languages_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_available_languages, get_bot_languages
available = get_available_languages()
discord_langs = get_bot_languages("discord")
telegram_langs = get_bot_languages("telegram")
return templates.TemplateResponse("languages.html", {
"request": request,
"available_languages": available,
"discord_languages": discord_langs,
"telegram_languages": telegram_langs,
"libretranslate_url": get_libretranslate_url()
})
@app.post("/languages/sync")
async def sync_languages(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
import aiohttp
url = get_libretranslate_url()
if not url:
return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER)
base_url = url.rstrip("/translate").rstrip("/translate/").rstrip("/")
if base_url.endswith("/translate"):
base_url = base_url[:-10]
base_url = base_url.rstrip("/")
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/languages", timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
languages = await resp.json()
from botdiscord.database import get_available_languages, set_available_languages
existing = {lang["code"]: lang.get("flag", "") for lang in get_available_languages()}
for lang in languages:
lang["flag"] = existing.get(lang["code"], "")
set_available_languages(languages)
return RedirectResponse(url="/languages?synced=1", status_code=status.HTTP_303_SEE_OTHER)
else:
return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error syncing languages: {e}")
return RedirectResponse(url="/languages?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/languages/bot")
async def update_bot_languages(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
bot_type = form.get("bot_type", "discord")
lang_codes = form.getlist("lang_codes")
from botdiscord.database import set_bot_languages
set_bot_languages(bot_type, lang_codes)
return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/languages/flags")
async def update_language_flags(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
from botdiscord.database import get_available_languages, set_available_languages
available = get_available_languages()
form = await request.form()
for lang in available:
flag_key = f"flag_{lang['code']}"
lang['flag'] = form.get(flag_key, "")
set_available_languages(available)
return RedirectResponse(url="/languages?success=1", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/logout")
async def logout():
response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie("auth")
response.delete_cookie("username")
return response
@app.get("/admins")
async def admins_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
admins = get_admins()
return templates.TemplateResponse("admins.html", {
"request": request,
"admins": admins
})
@app.post("/admins/add")
async def add_admin_post(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
username = form.get("username", "")
password = form.get("password", "")
if not username or not password:
return RedirectResponse(url="/admins?error=missing_fields", status_code=status.HTTP_303_SEE_OTHER)
try:
password_hash = hasher.hash(password)
add_admin(username, password_hash)
return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"CRITICAL ERROR adding admin: {e}")
# Redirigimos con error a la misma página
return RedirectResponse(url="/admins?error=" + str(e), status_code=status.HTTP_303_SEE_OTHER)
@app.post("/admins/delete")
async def delete_admin_post(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
admin_id = form.get("admin_id")
if admin_id:
try:
delete_admin(int(admin_id))
except Exception as e:
print(f"Error deleting admin: {e}")
return RedirectResponse(url="/admins", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/admins/update")
async def update_admin_post(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
admin_id = form.get("admin_id")
new_password = form.get("new_password")
if admin_id and new_password:
from botdiscord.database import update_admin_password
password_hash = hasher.hash(new_password)
try:
update_admin_password(int(admin_id), password_hash)
except Exception as e:
print(f"Error updating admin password: {e}")
return RedirectResponse(url="/admins?success=1", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/welcome")
async def welcome_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_all_welcome_configs
configs = get_all_welcome_configs()
config_dict = {c['guild_id']: c for c in configs} if configs else {}
selected_guild = request.query_params.get("guild")
selected_cfg = None
if selected_guild and selected_guild.isdigit():
selected_cfg = config_dict.get(int(selected_guild))
success = request.query_params.get("success") == "1"
error = request.query_params.get("error") == "1"
return templates.TemplateResponse("welcome.html", {
"request": request,
"configs": config_dict,
"config": config_dict,
"selected_guild": int(selected_guild) if selected_guild and selected_guild.isdigit() else None,
"selected_cfg": selected_cfg,
"success": success,
"error": error,
"new_form": request.query_params.get("new") == "1"
})
@app.post("/welcome/save")
async def save_welcome(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
guild_id = form.get("guild_id")
channel_id = form.get("channel_id")
message_content = form.get("message_content")
enabled = form.get("enabled") == "1"
if not guild_id or not channel_id or not message_content:
return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import save_welcome_message
save_welcome_message(
int(guild_id),
int(channel_id),
message_content,
enabled
)
return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error saving welcome message: {e}")
return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/welcome/delete")
async def delete_welcome(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
guild_id = form.get("guild_id")
if guild_id:
try:
from botdiscord.database import delete_welcome_message
delete_welcome_message(int(guild_id))
return RedirectResponse(url="/welcome?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error deleting welcome message: {e}")
return RedirectResponse(url="/welcome?error=1", status_code=status.HTTP_303_SEE_OTHER)
return RedirectResponse(url="/welcome", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/discord-channels")
async def discord_channels_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
from botdiscord.database import get_discord_servers, get_discord_channels
servers = get_discord_servers()
channels = get_discord_channels()
# Agrupar canales por servidor
servers_with_channels = {}
for server in servers:
server_id = server['server_id']
servers_with_channels[server_id] = {
'server_info': server,
'channels': [ch for ch in channels if ch['server_id'] == server_id]
}
success = request.query_params.get("success") == "1"
error = request.query_params.get("error") == "1"
synced = request.query_params.get("synced") == "1"
return templates.TemplateResponse("discord_channels.html", {
"request": request,
"servers_with_channels": servers_with_channels,
"success": success,
"error": error,
"synced": synced
})
@app.post("/discord-channels/sync")
async def sync_discord_servers(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
try:
print("[Panel] Iniciando sincronización de Discord...")
# Diagnóstico de variables de entorno
print("[Panel] Variables de entorno disponibles:")
discord_env_vars = {k: v for k, v in os.environ.items() if 'DISCORD' in k or 'TOKEN' in k}
for key, value in discord_env_vars.items():
print(f" {key}: {'***' if 'TOKEN' in key else value}")
# Importar y usar el módulo de sincronización
from botdiscord.server_sync import sync_discord_servers_from_api
result = await sync_discord_servers_from_api()
print(f"[Panel] Resultado de sincronización: {result}")
if result:
return RedirectResponse(url="/discord-channels?synced=1", status_code=status.HTTP_303_SEE_OTHER)
else:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"[Panel] Error en sincronización: {e}")
import traceback
traceback.print_exc()
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/toggle")
async def toggle_discord_channel(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
channel_id = form.get("channel_id")
is_active = form.get("is_active") == "true"
if not channel_id:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import toggle_channel_status
from botdiscord.reload_marker import set_reload_marker
toggle_channel_status(int(channel_id), is_active)
set_reload_marker()
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error toggling channel: {e}")
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/delete-server")
async def delete_discord_server(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
server_id = form.get("server_id")
if not server_id:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import delete_discord_server
from botdiscord.reload_marker import set_reload_marker
delete_discord_server(int(server_id))
set_reload_marker()
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"Error deleting server: {e}")
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/reload-bot")
async def reload_discord_bot(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
try:
print("[Panel] Enviando señal de recarga al bot de Discord...")
# Importar y ejecutar la función de recarga
from botdiscord.bot_reload import reload_bot_config
result = await reload_bot_config()
if result:
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
else:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"[Panel] Error recargando bot: {e}")
import traceback
traceback.print_exc()
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.post("/discord-channels/bulk-toggle")
async def bulk_toggle_discord_channels(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
form = await request.form()
channel_ids = form.getlist("channel_ids")
bulk_action = form.get("bulk_action")
if not channel_ids or not bulk_action:
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
try:
from botdiscord.database import toggle_channel_status
is_active = (bulk_action == "activate")
success_count = 0
for channel_id_str in channel_ids:
try:
channel_id = int(channel_id_str)
toggle_channel_status(channel_id, is_active)
success_count += 1
except ValueError:
continue
print(f"[Panel] Bulk toggle: {success_count} canales {'activados' if is_active else 'desactivados'}")
from botdiscord.reload_marker import set_reload_marker
set_reload_marker()
return RedirectResponse(url="/discord-channels?success=1", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(f"[Panel] Error en bulk toggle: {e}")
import traceback
traceback.print_exc()
return RedirectResponse(url="/discord-channels?error=1", status_code=status.HTTP_303_SEE_OTHER)
@app.get("/diagnosis")
async def diagnosis_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
# Obtener variables de entorno relevantes
env_vars = {
'DISCORD_TOKEN': os.getenv('DISCORD_TOKEN'),
'TELEGRAM_TOKEN': os.getenv('TELEGRAM_TOKEN'),
'LIBRETRANSLATE_URL': os.getenv('LIBRETRANSLATE_URL'),
'DB_TYPE': os.getenv('DB_TYPE'),
'DB_HOST': os.getenv('DB_HOST'),
'DB_PORT': os.getenv('DB_PORT'),
'DB_NAME': os.getenv('DB_NAME'),
'DB_USER': os.getenv('DB_USER')
}
# Obtener configuración cargada
config = get_config()
return templates.TemplateResponse("diagnosis.html", {
"request": request,
"env_vars": env_vars,
"config": config
})
@app.get("/api/stats")
async def get_stats(request: Request):
if request.cookies.get("auth") != "ok":
raise HTTPException(status_code=401)
stats = get_translation_stats()
return stats
@app.get("/metrics")
async def metrics_page(request: Request):
if request.cookies.get("auth") != "ok":
return RedirectResponse(url="/login")
username = request.cookies.get("username", "")
return templates.TemplateResponse("metrics.html", {
"request": request,
"username": username
})
if __name__ == "__main__":
import uvicorn
web_config = get_web_config()
uvicorn.run(app, host=web_config.get("host", "0.0.0.0"), port=web_config.get("port", 8000))