Merge pull request #30 from marcogll/feature/flow-engine-implementation-15654864159042246464

Feature/flow engine implementation 15654864159042246464
This commit is contained in:
Marco Gallegos
2025-12-20 22:50:20 -06:00
committed by GitHub
12 changed files with 325 additions and 890 deletions

View File

@@ -1,58 +1,18 @@
# .env.example
# Rellena estas variables y renombra este archivo a .env
# --- TELEGRAM & SECURITY ---
# Token de tu bot de Telegram, obtenido de @BotFather.
TELEGRAM_BOT_TOKEN=
# Tu Telegram User ID. Usado para notificaciones críticas y funciones de administrador.
ADMIN_ID=
# (Opcional) Lista separada por comas de IDs de Telegram para los miembros del equipo.
CREW_CHAT_IDS=
# --- AI CORE ---
# Tu clave de API de OpenAI.
OPENAI_API_KEY=
# El modelo de OpenAI que quieres usar (ej. gpt-4, gpt-3.5-turbo).
OPENAI_MODEL=gpt-3.5-turbo
# --- INTEGRACIONES ---
# URL base de tu instancia de Vikunja (ej. https://vikunja.tu-dominio.com/api/v1).
VIKUNJA_BASE_URL=
# Token de API generado en Vikunja.
# --- INTEGRATIONS ---
VIKUNJA_API_URL=https://tuservidor.com/api/v1
VIKUNJA_TOKEN=
# ID del proyecto en Vikunja que se usará como "bandeja de entrada" para nuevas tareas.
VIKUNJA_INBOX_PROJECT_ID=
GOOGLE_CREDENTIALS_PATH=./data/credentials.json
# Ruta al archivo JSON de credenciales de tu cuenta de servicio de Google.
GOOGLE_SERVICE_ACCOUNT_FILE=google_key.json
# ID del calendario de Google que el bot gestionará.
CALENDAR_ID=
# --- PRINT SERVICE (SMTP/IMAP) ---
# Servidor SMTP para enviar correos.
SMTP_SERVER=
# Puerto del servidor SMTP (ej. 465 para SSL, 587 para STARTTLS).
SMTP_PORT=
# Usuario para la autenticación SMTP.
SMTP_USER=
# Contraseña para la autenticación SMTP.
SMTP_PASSWORD=
# Servidor IMAP para leer correos.
IMAP_SERVER=
# Usuario para la autenticación IMAP.
IMAP_USER=
# Contraseña para la autenticación IMAP.
IMAP_PASSWORD=
# Dirección de correo de la impresora (a donde se envían los trabajos de impresión).
PRINTER_EMAIL=
# --- OTROS ---
# (Opcional) URL de un webhook de n8n para integraciones personalizadas.
N8N_WEBHOOK_URL=
# Hora para enviar el resumen diario (formato HH:MM).
DAILY_SUMMARY_TIME=08:00
# Tu enlace de Calendly para agendar citas.
CALENDLY_LINK=
# Zona horaria para el bot (ej. America/Mexico_City, Europe/Madrid).
TIMEZONE=America/Monterrey
# --- PRINT SERVICE ---
SMTP_SERVER=smtp.hostinger.com
SMTP_PORT=465
SMTP_USER=print.service@vanityexperience.mx
SMTP_PASS=
IMAP_SERVER=imap.hostinger.com

143
README.md
View File

@@ -1,12 +1,10 @@
# 🤖 Talia Bot: Asistente Personal & Orquestador de Negocio
Talia no es un simple chatbot; es un Middleware de Inteligencia Artificial que orquesta las operaciones diarias de administración, logística y ventas. Actúa como el puente central entre usuarios en Telegram y servicios críticos como Vikunja (Gestión de Proyectos), Google Calendar y Hardware de Impresión remota.
Talia no es un simple chatbot; es un Middleware de Inteligencia Artificial alojado en un VPS que orquesta las operaciones diarias de administración, logística y ventas. Actúa como el puente central entre usuarios en Telegram y servicios críticos como Vikunja (Gestión de Proyectos), Google Calendar y Hardware de Impresión remota.
---
## 🚀 Conceptos Centrales
### 1. Enrutamiento por Identidad
## 🚀 Concepto Central: Enrutamiento por Identidad
La característica core de Talia es su capacidad de cambiar de personalidad y permisos dinámicamente basándose en el Telegram ID del usuario:
@@ -16,32 +14,60 @@ La característica core de Talia es su capacidad de cambiar de personalidad y pe
| **Crew** | 👷 | Equipo Operativo | Limitado: Solicitud de agenda (validada), asignación de tareas, impresión de documentos. |
| **Cliente** | 👤 | Usuario Público | Ventas: Embudo de captación, consulta de servicios (RAG) y agendamiento comercial. |
### 2. Motor de Flujos Conversacionales
Toda la lógica de conversación del bot es impulsada por un motor de flujos genérico. En lugar de tener conversaciones codificadas, el bot interpreta definiciones de un archivo central `flows.json`.
* **`main.py`**: Contiene un `universal_handler` que captura todas las interacciones del usuario.
* **`flow_engine.py`**: Es el cerebro. Consulta el estado actual del usuario en la base de datos, lee el `flows.json` para determinar el siguiente paso y maneja la lógica de la conversación.
* **`flows.json`**: Un archivo JSON que define cada pregunta, botón y acción para todos los flujos de conversación, separados por rol. Esto permite modificar o añadir nuevas conversaciones sin cambiar el código principal.
---
## 🛠️ Arquitectura Técnica
El sistema sigue un flujo modular:
1. **Input**: Telegram (Texto, Audio, Documentos, Botones).
2. **Transcripción**: `transcription.py` (Whisper) convierte voz a texto.
3. **Router**: `universal_handler` en `main.py` enruta la entrada al `FlowEngine`.
4. **Estado**: El `FlowEngine` consulta la tabla `conversations` en la base de datos para saber si el usuario está en medio de un flujo.
5. **Lógica**: El `FlowEngine` utiliza `flows.json` para procesar la entrada, recoger datos y determinar el siguiente paso.
6. **Resolución**: Una vez que un flujo se completa, `main.py` ejecuta la acción final (la "resolución") llamando al módulo correspondiente.
7. **Módulos de Acción (Tools)**:
* **`vikunja.py`**: API asíncrona para leer/escribir tareas y proyectos.
* **`calendar.py`**: API para crear eventos en Google Calendar.
* **`mailer.py`**: Envío de correos (SMTP) para el flujo de impresión.
* **`imap_listener.py`**: Escucha de confirmaciones de impresión (IMAP).
* **`llm_engine.py`**: Análisis RAG para el embudo de ventas.
1. **Input**: Telegram (Texto o Audio).
2. **STT**: Whisper (Conversión de Audio a Texto).
3. **Router**: Verificación de ID contra la base de datos de usuarios.
4. **Cerebro (LLM)**: OpenAI (Fase 1) / Google Gemini (Fase 2).
5. **Tools**:
* **Vikunja API**: Lectura/Escritura de tareas con filtrado de privacidad.
* **Google Calendar API**: Gestión de tiempos y reglas de disponibilidad.
* **SMTP/IMAP**: Comunicación bidireccional con impresoras.
* **NFC Gen**: Codificación Base64 para tags físicos.
---
## 📋 Flujos de Trabajo (Features)
### 1. 👑 Gestión Admin (Proyectos & Identidad)
* **Proyectos (Vikunja)**:
* Resumen inteligente de estatus de proyectos.
* Comandos naturales: *"Marca el proyecto de web como terminado y comenta que se envió factura"*.
* **Wizard de Identidad (NFC)**:
* Flujo paso a paso para dar de alta colaboradores.
* Genera JSON de registro y String Base64 listo para escribir en Tags NFC.
* Inputs: Nombre, ID Empleado, Sucursal (Botones), Telegram ID.
### 2. 👷 Gestión Crew (Agenda & Tareas)
* **Solicitud de Tiempo (Wizard)**:
* Solicita espacios de 1 a 4 horas.
* **Reglas de Negocio**:
* No permite fechas > 3 meses a futuro.
* **Gatekeeper**: Verifica Google Calendar. Si hay evento "Privado" del Admin, rechaza automáticamente.
* **Modo Buzón (Vikunja)**:
* Crea tareas asignadas al Admin.
* **Privacidad**: Solo pueden consultar el estatus de tareas creadas por ellos mismos.
### 3. 🖨️ Sistema de Impresión Remota (Print Loop)
* Permite enviar archivos desde Telegram a la impresora física de la oficina.
* **Envío (SMTP)**: El bot envía el documento a un correo designado.
* **Tracking**: El asunto del correo lleva un hash único: `PJ:{uuid}#TID:{telegram_id}`.
* **Confirmación (IMAP Listener)**: Un proceso en background escucha la respuesta de la impresora y notifica al usuario en Telegram.
### 4. 👤 Ventas Automáticas (RAG)
* Identifica usuarios nuevos (no registrados en la DB).
* Captura datos (Lead Magnet).
* Analiza ideas de clientes usando `servicios.json` (Base de conocimiento).
* Ofrece citas de ventas mediante link de Calendly.
---
@@ -49,59 +75,53 @@ El sistema sigue un flujo modular:
### Prerrequisitos
* Python 3.9+
* Docker y Docker Compose (recomendado)
* Python 3.10+
* Cuenta de Telegram Bot (@BotFather)
* Instancia de Vikunja (Self-hosted)
* Cuenta de Servicio Google Cloud (Calendar API)
* Servidor de Correo (SMTP/IMAP)
### 1. Clonar y Entorno
### 1. Clonar y Entorno Virtual
```bash
git clone https://github.com/marcogll/talia_bot_mg.git
cd talia_bot_mg
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
```
### 2. Variables de Entorno (`.env`)
Crea un archivo `.env` en la raíz del proyecto a partir de `.env.example` y rellena las siguientes variables:
Crea un archivo `.env` en la raíz con la siguiente estructura:
```env
# --- TELEGRAM & SECURITY ---
TELEGRAM_BOT_TOKEN=tu_token_telegram
ADMIN_ID=tu_telegram_id
CREW_CHAT_IDS=id1,id2,id3
# --- AI CORE ---
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-3.5-turbo
# --- INTEGRACIONES ---
VIKUNJA_BASE_URL=https://tu_vikunja.com/api/v1
VIKUNJA_API_URL=https://tuservidor.com/api/v1
VIKUNJA_TOKEN=tu_token_vikunja
VIKUNJA_INBOX_PROJECT_ID=el_id_de_tu_proyecto_bandeja_de_entrada
GOOGLE_SERVICE_ACCOUNT_FILE=google_key.json
CALENDAR_ID=tu_id_de_google_calendar
GOOGLE_CREDENTIALS_PATH=./data/credentials.json
# --- PRINT SERVICE (SMTP/IMAP) ---
# --- PRINT SERVICE ---
SMTP_SERVER=smtp.hostinger.com
SMTP_PORT=465
SMTP_USER=print.service@vanityexperience.mx
SMTP_PASSWORD=tu_password_smtp
SMTP_PASS=tu_password_seguro
IMAP_SERVER=imap.hostinger.com
IMAP_USER=print.service@vanityexperience.mx
IMAP_PASSWORD=tu_password_imap
PRINTER_EMAIL=vanityprinter@print.epsonconnect.com
```
### 3. Ejecutar con Docker
### 3. Estructura de Datos
La forma más sencilla de levantar el bot es con Docker Compose:
```bash
docker-compose up --build
```
Asegúrate de tener los archivos base en `talia_bot/data/`:
* `servicios.json`: Catálogo de servicios para el RAG de ventas.
* `credentials.json`: Credenciales de Google Cloud.
* `users.db`: Base de datos SQLite.
---
@@ -110,24 +130,22 @@ docker-compose up --build
```text
talia_bot_mg/
├── talia_bot/
│ ├── main.py # Entry Point y Universal Handler
│ ├── db.py # Gestión de la base de datos (SQLite)
│ ├── main.py # Entry Point y Router de Identidad
│ ├── db.py # Gestión de la base de datos
│ ├── config.py # Carga de variables de entorno
│ ├── modules/
│ │ ├── flow_engine.py # El cerebro que procesa los flujos
│ │ ├── vikunja.py # API Manager asíncrono para Tareas
│ │ ├── calendar.py # Lógica de Google Calendar
│ │ ├── llm_engine.py # Cliente OpenAI (Whisper y GPT)
│ │ ├── transcription.py # Lógica de transcripción de audio
│ │ ── mailer.py # Módulo para envío de correos (SMTP)
│ │ └── imap_listener.py # Módulo para leer correos (IMAP)
│ │ ├── identity.py # Lógica de Roles y Permisos
│ │ ├── llm_engine.py # Cliente OpenAI/Gemini
│ │ ├── vikunja.py # API Manager para Tareas
│ │ ├── calendar.py # Google Calendar Logic & Rules
│ │ ├── printer.py # SMTP/IMAP Loop
│ │ ── sales_rag.py # Lógica de Ventas y Servicios
│ └── data/
│ ├── flows.json # ¡IMPORTANTE! Define todas las conversaciones
│ ├── services.json # Base de conocimiento para ventas
│ ├── servicios.json # Base de conocimiento
│ ├── credentials.json # Credenciales de Google
│ └── users.db # Base de datos de usuarios
├── .env # Tus variables de entorno (NO subir a Git)
├── .env.example # Plantilla de variables de entorno
├── requirements.txt # Dependencias de Python
├── requirements.txt # Dependencias
├── Dockerfile # Configuración del contenedor
└── docker-compose.yml # Orquestador de Docker
```
@@ -136,13 +154,12 @@ talia_bot_mg/
## 🗓️ Roadmap
- [x] **Implementado el Motor de Flujos Conversacionales.**
- [x] **Integración completa de Vikunja, OpenAI y Google Calendar.**
- [x] **Implementado el Loop de Confirmación de Impresión (IMAP).**
- [ ] Mejorar el parsing de fechas y horas con lenguaje natural más avanzado.
- [ ] Implementar Wizard de creación de Tags NFC (Base64).
- [ ] Conectar Loop de Impresión (SMTP/IMAP).
- [ ] Migrar de OpenAI a Google Gemini 1.5 Pro.
- [ ] Implementar soporte para fotos en impresión.
---
Desarrollado por: Marco G.
Asistente Personalizado v2.1 (Ciclo de Impresión Completo)
Asistente Personalizado v1.0

View File

@@ -7,4 +7,3 @@ google-auth-oauthlib
openai
pytz
python-dotenv
python-dateutil

View File

@@ -10,44 +10,39 @@ from pathlib import Path
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
# --- TELEGRAM & SECURITY ---
# Token del bot de Telegram (obtenido de @BotFather)
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Prioriza ADMIN_ID, pero usa OWNER_CHAT_ID como fallback para compatibilidad
ADMIN_ID = os.getenv("ADMIN_ID") or os.getenv("OWNER_CHAT_ID")
CREW_CHAT_IDS = os.getenv("CREW_CHAT_IDS", "").split(',')
# --- AI CORE ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
# ID de chat del dueño del bot (para recibir notificaciones importantes)
ADMIN_ID = os.getenv("ADMIN_ID")
# --- INTEGRACIONES ---
# Google
# Ruta al archivo de credenciales de la cuenta de servicio de Google
GOOGLE_SERVICE_ACCOUNT_FILE = os.getenv("GOOGLE_SERVICE_ACCOUNT_FILE")
if GOOGLE_SERVICE_ACCOUNT_FILE and not os.path.isabs(GOOGLE_SERVICE_ACCOUNT_FILE):
GOOGLE_SERVICE_ACCOUNT_FILE = str(Path(__file__).parent.parent / GOOGLE_SERVICE_ACCOUNT_FILE)
# ID del calendario de Google que usará el bot
CALENDAR_ID = os.getenv("CALENDAR_ID")
# Vikunja
VIKUNJA_API_URL = os.getenv("VIKUNJA_BASE_URL")
VIKUNJA_API_TOKEN = os.getenv("VIKUNJA_TOKEN")
VIKUNJA_INBOX_PROJECT_ID = os.getenv("VIKUNJA_INBOX_PROJECT_ID")
# n8n
# URL del webhook de n8n para enviar datos a otros servicios
N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL")
N8N_TEST_WEBHOOK_URL = os.getenv("N8N_WEBHOOK-TEST_URL")
N8N_TEST_WEBHOOK_URL = os.getenv("N8N_TEST_WEBHOOK_URL")
# --- PRINT SERVICE (SMTP/IMAP) ---
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
# Configuración de Vikunja
VIKUNJA_API_URL = os.getenv("VIKUNJA_API_URL", "https://tasks.soul23.cloud/api/v1")
VIKUNJA_API_TOKEN = os.getenv("VIKUNJA_API_TOKEN")
IMAP_SERVER = os.getenv("IMAP_SERVER")
IMAP_USER = os.getenv("IMAP_USER")
IMAP_PASSWORD = os.getenv("IMAP_PASSWORD")
PRINTER_EMAIL = os.getenv("PRINTER_EMAIL")
# Llave de la API de OpenAI para usar modelos de lenguaje (como GPT)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# --- OTROS ---
# Modelo de OpenAI a utilizar (ej. gpt-3.5-turbo, gpt-4)
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
# Hora del resumen diario (formato HH:MM)
DAILY_SUMMARY_TIME = os.getenv("DAILY_SUMMARY_TIME", "07:00")
# Enlace de Calendly para agendar citas
CALENDLY_LINK = os.getenv("CALENDLY_LINK", "https://calendly.com/user/appointment-link")
# Zona horaria por defecto para el manejo de fechas y horas
TIMEZONE = os.getenv("TIMEZONE", "America/Mexico_City")

View File

@@ -32,20 +32,8 @@ def setup_database():
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
flow_id TEXT NOT NULL,
current_step_id INTEGER NOT NULL,
collected_data TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (telegram_id)
)
""")
conn.commit()
logger.info("Database setup complete. 'users' and 'conversations' tables are ready.")
logger.info("Database setup complete. 'users' table is ready.")
except sqlite3.Error as e:
logger.error(f"Database error during setup: {e}")
finally:

View File

@@ -34,19 +34,9 @@ from talia_bot.modules.aprobaciones import view_pending, handle_approval_action
from talia_bot.modules.servicios import get_service_info
from talia_bot.modules.admin import get_system_status
from talia_bot.modules.debug import print_handler
import json
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
import io
from talia_bot.modules.vikunja import get_projects, add_comment_to_task, update_task_status, get_project_tasks, create_task
from talia_bot.modules.create_tag import create_tag_conv_handler
from talia_bot.modules.vikunja import vikunja_conv_handler
from talia_bot.db import setup_database
from talia_bot.modules.flow_engine import FlowEngine
from talia_bot.modules.transcription import transcribe_audio
import uuid
from talia_bot.modules.llm_engine import analyze_client_pitch
from talia_bot.modules.calendar import create_event
from talia_bot.modules.mailer import send_email_with_attachment
from talia_bot.modules.imap_listener import check_for_confirmation
from talia_bot.config import ADMIN_ID, VIKUNJA_INBOX_PROJECT_ID
from talia_bot.scheduler import schedule_daily_summary
@@ -56,342 +46,6 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
# Instanciamos el motor de flujos
flow_engine = FlowEngine()
async def send_step_message(update: Update, context: ContextTypes.DEFAULT_TYPE, step: dict, collected_data: dict = None):
"""
Envía el mensaje de un paso del flujo, construyendo el teclado dinámicamente.
"""
keyboard = []
input_type = step.get("input_type")
collected_data = collected_data or {}
if input_type == "keyboard" and "options" in step:
for option in step["options"]:
keyboard.append([InlineKeyboardButton(option, callback_data=option)])
elif input_type == "dynamic_keyboard_vikunja":
projects = await get_projects()
if projects:
for project in projects:
keyboard.append([InlineKeyboardButton(project['title'], callback_data=f"project_{project['id']}")])
else:
await update.effective_message.reply_text("No se pudieron cargar los proyectos de Vikunja.")
return
elif input_type == "dynamic_keyboard_vikunja_tasks":
project_id_str = collected_data.get('PROJECT_SELECT', '').split('_')[-1]
if project_id_str.isdigit():
project_id = int(project_id_str)
tasks = await get_project_tasks(project_id)
if tasks:
for task in tasks:
keyboard.append([InlineKeyboardButton(task['title'], callback_data=f"task_{task['id']}")])
else:
await update.effective_message.reply_text("Este proyecto no tiene tareas. Puedes añadir una o seleccionar otro proyecto.")
# Aquí podríamos opcionalmente terminar el flujo o devolver al paso anterior.
return
else:
await update.effective_message.reply_text("Error: No se pudo identificar el proyecto para buscar tareas.")
return
reply_markup = InlineKeyboardMarkup(keyboard) if keyboard else None
# Si la actualización es de un botón, edita el mensaje. Si no, envía uno nuevo.
if update.callback_query:
await update.callback_query.edit_message_text(
text=step["question"], reply_markup=reply_markup, parse_mode='Markdown'
)
else:
await update.message.reply_text(
text=step["question"], reply_markup=reply_markup, parse_mode='Markdown'
)
async def universal_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handler universal que gestiona todos los flujos de conversación.
"""
user_id = update.effective_user.id
user_role = get_user_role(user_id)
state = flow_engine.get_conversation_state(user_id)
if state:
response_data = None
if update.callback_query:
response_data = update.callback_query.data
await update.callback_query.answer()
elif update.message and update.message.text:
response_data = update.message.text
elif update.message and update.message.voice:
voice_file = await update.message.voice.get_file()
file_buffer = io.BytesIO()
await voice_file.download_to_memory(file_buffer)
file_buffer.seek(0)
file_buffer.name = "voice_message.oga"
await update.message.reply_text("Transcribiendo audio... ⏳")
response_data = await transcribe_audio(file_buffer)
if response_data is None:
await update.message.reply_text("Lo siento, no pude entender el audio. ¿Podrías intentarlo de nuevo?")
return
elif update.message and update.message.document:
# Guardamos la información del archivo para el paso de resolución
response_data = {
"file_id": update.message.document.file_id,
"file_name": update.message.document.file_name,
}
if response_data:
result = flow_engine.handle_response(user_id, response_data)
if result.get("status") == "in_progress":
# Pasamos los datos recolectados para que el siguiente paso los pueda usar si es necesario
current_state = flow_engine.get_conversation_state(user_id)
await send_step_message(update, context, result["step"], current_state.get("collected_data"))
elif result.get("status") == "complete":
await handle_flow_resolution(update, context, result)
elif result.get("status") == "error":
await update.effective_message.reply_text(f"Error: {result.get('message', 'Ocurrió un error.')}")
return
trigger = None
is_callback = False
if update.callback_query:
trigger = update.callback_query.data
is_callback = True
await update.callback_query.answer()
elif update.message and update.message.text:
trigger = update.message.text
# Flujo automático para clientes
if not trigger and user_role == 'client' and not state:
flow_to_start = next((f for f in flow_engine.flows if f.get("trigger_automatic")), None)
if flow_to_start:
logger.info(f"Starting automatic flow '{flow_to_start['id']}' for client {user_id}")
initial_step = flow_engine.start_flow(user_id, flow_to_start['id'])
if initial_step:
await send_step_message(update, context, initial_step)
return
if trigger:
for flow in flow_engine.flows:
if trigger == flow.get('trigger_button') or trigger == flow.get('trigger_command'):
logger.info(f"Starting flow '{flow['id']}' for user {user_id} via trigger '{trigger}'")
initial_step = flow_engine.start_flow(user_id, flow['id'])
if initial_step:
await send_step_message(update, context, initial_step)
return
# Si ninguna acción de flujo se disparó y es un callback, podría ser una acción del menú principal
if is_callback:
logger.info(f"Callback '{trigger}' no fue manejado por el motor de flujos. Pasando al dispatcher legado.")
await button_dispatcher(update, context)
async def check_print_confirmation_job(context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Job que se ejecuta para verificar la confirmación de impresión.
"""
job = context.job
user_id, job_id, file_name = job.data
logger.info(f"Running print confirmation check for job_id: {job_id}")
confirmation_data = await asyncio.to_thread(check_for_confirmation, job_id)
if confirmation_data:
await context.bot.send_message(chat_id=user_id, text=f"✅ ¡Éxito! Tu archivo '{file_name}' ha sido impreso correctamente.")
else:
await context.bot.send_message(chat_id=user_id, text=f"⚠️ El trabajo de impresión para '{file_name}' fue enviado, pero no he recibido una confirmación de la impresora. Por favor, verifica la bandeja de la impresora.")
async def handle_flow_resolution(update: Update, context: ContextTypes.DEFAULT_TYPE, result: dict):
"""
Maneja la acción final de un flujo completado.
"""
resolution_step = result.get("resolution")
collected_data = result.get("data", {})
if not resolution_step:
logger.info(f"Flujo completado sin paso de resolución. Datos: {collected_data}")
final_message = "Proceso completado. ✅"
if update.callback_query:
await update.callback_query.edit_message_text(final_message)
else:
await update.effective_message.reply_text(final_message)
return
resolution_type = resolution_step.get("input_type")
final_message = resolution_step.get("question", "Hecho. ✅")
logger.info(f"Resolviendo flujo con tipo '{resolution_type}' y datos: {collected_data}")
# Lógica de resolución
if resolution_type == "resolution_api_success":
action = collected_data.get("ACTION_TYPE")
task_id_str = collected_data.get("TASK_SELECT", "").split('_')[-1]
update_content = collected_data.get("UPDATE_CONTENT")
if task_id_str.isdigit():
task_id = int(task_id_str)
if action == "💬 Agregar Comentario":
await add_comment_to_task(task_id=task_id, comment=update_content)
elif action == "🔄 Actualizar Estatus":
await update_task_status(task_id=task_id, status_text=update_content)
elif action == "✅ Marcar Completado":
await update_task_status(task_id=task_id, is_done=True)
elif resolution_type == "resolution_notify_admin":
admin_id = context.bot_data.get("ADMIN_ID", ADMIN_ID) # Obtener ADMIN_ID de config
if admin_id:
user_info = (
f"✨ **Nueva Solicitud de Onboarding** ✨\n\n"
f"Un nuevo candidato ha completado el formulario:\n\n"
f"👤 **Nombre:** {collected_data.get('ONBOARD_START', 'N/A')}\n"
f"🏢 **Base:** {collected_data.get('ONBOARD_ORIGIN', 'N/A')}\n"
f"📧 **Email:** {collected_data.get('ONBOARD_EMAIL', 'N/A')}\n"
f"📱 **Teléfono:** {collected_data.get('ONBOARD_PHONE', 'N/A')}\n\n"
f"Por favor, revisa y añade al usuario al sistema si es aprobado."
)
await context.bot.send_message(chat_id=admin_id, text=user_info, parse_mode='Markdown')
elif resolution_type == "rag_analysis_resolution":
pitch = collected_data.get("IDEA_PITCH")
display_name = update.effective_user.full_name
final_message = await analyze_client_pitch(pitch, display_name)
elif resolution_type == "resolution_event_created":
from dateutil.parser import parse
from datetime import datetime, timedelta
date_str = collected_data.get("BLOCK_DATE", "Hoy")
time_str = collected_data.get("BLOCK_TIME", "")
title = collected_data.get("BLOCK_TITLE", "Bloqueado por Talia")
try:
# Interpretar la fecha
if date_str.lower() == 'hoy':
start_date = datetime.now()
elif date_str.lower() == 'mañana':
start_date = datetime.now() + timedelta(days=1)
else:
start_date = parse(date_str)
# Interpretar el rango de tiempo
time_parts = [part.strip() for part in time_str.replace('a', '-').split('-')]
start_time_obj = parse(time_parts[0])
end_time_obj = parse(time_parts[1])
start_time = start_date.replace(hour=start_time_obj.hour, minute=start_time_obj.minute, second=0, microsecond=0)
end_time = start_date.replace(hour=end_time_obj.hour, minute=end_time_obj.minute, second=0, microsecond=0)
except (ValueError, IndexError):
final_message = "❌ Formato de fecha u hora no reconocido. Por favor, usa algo como 'Hoy', 'Mañana', o '10am - 11am'."
if update.callback_query:
await update.callback_query.edit_message_text(final_message)
else:
await update.effective_message.reply_text(final_message)
return
event = await asyncio.to_thread(
create_event,
summary=title,
start_time=start_time,
end_time=end_time,
attendees=[] # Añadir asistentes si fuera necesario
)
if not event:
final_message = "❌ Hubo un error al crear el evento en el calendario."
elif resolution_type == "resolution_saved":
idea_action = collected_data.get("IDEA_ACTION")
idea_content = collected_data.get('IDEA_CONTENT', 'N/A')
if idea_action == "✅ Crear Tarea":
if VIKUNJA_INBOX_PROJECT_ID:
new_task = await create_task(
project_id=int(VIKUNJA_INBOX_PROJECT_ID),
title=idea_content
)
if new_task:
final_message = "Tarea creada exitosamente en tu bandeja de entrada de Vikunja."
else:
final_message = "❌ Hubo un error al crear la tarea en Vikunja."
else:
final_message = "❌ Error: El ID del proyecto de bandeja de entrada de Vikunja no está configurado."
elif idea_action == "📓 Guardar Nota":
admin_id = ADMIN_ID
idea_category = collected_data.get('IDEA_CATEGORY', 'N/A')
message = (
f"🧠 **Nueva Idea Capturada (Guardada como Nota)** 🧠\n\n"
f"**Categoría:** {idea_category}\n\n"
f"**Contenido:**\n{idea_content}"
)
await context.bot.send_message(chat_id=admin_id, text=message, parse_mode='Markdown')
elif resolution_type == "resolution_email_sent":
file_info = collected_data.get("UPLOAD_FILE")
user_id = update.effective_user.id
if isinstance(file_info, dict):
file_id = file_info.get("file_id")
file_name = file_info.get("file_name")
if file_id and file_name:
job_id = str(uuid.uuid4())
subject_data = {
"job_id": job_id,
"telegram_id": user_id,
"filename": file_name
}
subject = f"DATA:{json.dumps(subject_data)}"
file_obj = await context.bot.get_file(file_id)
file_buffer = io.BytesIO()
await file_obj.download_to_memory(file_buffer)
file_buffer.seek(0)
success = await send_email_with_attachment(
file_content=file_buffer.getvalue(),
filename=file_name,
subject=subject
)
if success:
final_message = f"Recibido. 📨\n\nTu trabajo de impresión ha sido enviado (Job ID: {job_id}). Te notificaré cuando la impresora confirme que ha sido impreso."
# Programar la verificación en segundo plano
context.job_queue.run_once(
check_print_confirmation_job,
when=60, # segundos
data=(user_id, job_id, file_name),
name=f"print_job_{job_id}"
)
else:
final_message = "❌ Hubo un error al enviar el archivo a la impresora."
else:
final_message = "❌ No se encontró la información del archivo."
else:
final_message = "❌ Error en el formato de los datos del archivo."
elif resolution_type == "system_output_nfc":
# Lógica para devolver un JSON con los datos para el tag NFC
nfc_data = {
"name": collected_data.get("WIZARD_START"),
"employee_id": collected_data.get("NUM_EMP"),
"branch": collected_data.get("SUCURSAL"),
"telegram_id": collected_data.get("TELEGRAM_ID"),
}
final_message = f"```json\n{json.dumps(nfc_data, indent=2)}\n```"
# Enviar el mensaje de confirmación final
if update.callback_query:
await update.callback_query.edit_message_text(final_message)
else:
await update.effective_message.reply_text(final_message)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Se ejecuta cuando el usuario escribe /start.
@@ -402,17 +56,20 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"Usuario {chat_id} inició conversación con el rol: {user_role}")
# Obtenemos el texto y los botones de bienvenida desde el módulo de onboarding
response_text, reply_markup = onboarding_handle_start(user_role)
await update.message.reply_text(response_text, reply_markup=reply_markup, parse_mode='Markdown')
# Respondemos al usuario
await update.message.reply_text(response_text, reply_markup=reply_markup)
async def button_dispatcher(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Dispatcher legado para manejar botones que no inician flujos.
Esta función maneja los clics en los botones del menú.
Dependiendo de qué botón se presione, ejecuta una acción diferente.
"""
query = update.callback_query
# No se necesita await query.answer() aquí porque ya se llamó en universal_handler
logger.info(f"Dispatcher legado manejando consulta: {query.data}")
await query.answer()
logger.info(f"El despachador recibió una consulta: {query.data}")
response_text = "Acción no reconocida."
reply_markup = None
@@ -434,32 +91,34 @@ async def button_dispatcher(update: Update, context: ContextTypes.DEFAULT_TYPE)
try:
if query.data in simple_handlers:
handler = simple_handlers[query.data]
logger.info(f"Ejecutando simple_handler para: {query.data}")
if asyncio.iscoroutinefunction(handler):
response_text = await handler()
else:
response_text = handler()
elif query.data in complex_handlers:
handler = complex_handlers[query.data]
logger.info(f"Ejecutando complex_handler para: {query.data}")
if asyncio.iscoroutinefunction(handler):
response_text, reply_markup = await handler()
else:
response_text, reply_markup = handler()
elif query.data.startswith(('approve:', 'reject:')):
logger.info(f"Ejecutando acción de aprobación: {query.data}")
response_text = handle_approval_action(query.data)
elif query.data == 'start_create_tag':
response_text = "Para crear un tag, por favor usa el comando /create_tag."
else:
# Si llega aquí, es una acción que ni el motor ni el dispatcher conocen.
await query.edit_message_text(text=f"Lo siento, la acción '{query.data}' no se reconoce.")
logger.warning(f"Consulta no manejada por el despachador: {query.data}")
await query.edit_message_text(text=response_text)
return
except Exception as exc:
logger.exception(f"Error al procesar la acción {query.data} en el dispatcher legado: {exc}")
response_text = "❌ Ocurrió un error al procesar tu solicitud."
logger.exception(f"Error al procesar la acción {query.data}: {exc}")
response_text = "❌ Ocurrió un error al procesar tu solicitud. Intenta de nuevo."
reply_markup = None
await query.edit_message_text(text=response_text, reply_markup=reply_markup, parse_mode='Markdown')
def main() -> None:
"""Función principal que arranca el bot."""
if not TELEGRAM_BOT_TOKEN:
@@ -471,19 +130,25 @@ def main() -> None:
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
schedule_daily_summary(application)
# Handlers principales
# El orden de los handlers es crucial para que las conversaciones funcionen.
application.add_handler(create_tag_conv_handler())
application.add_handler(vikunja_conv_handler())
conv_handler = ConversationHandler(
entry_points=[CallbackQueryHandler(propose_activity_start, pattern='^propose_activity$')],
states={
DESCRIPTION: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_description)],
DURATION: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_duration)],
},
fallbacks=[CommandHandler('cancel', cancel_proposal)],
per_message=False
)
application.add_handler(conv_handler)
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("print", print_handler))
# El handler universal para flujos (prioridad 0)
application.add_handler(CallbackQueryHandler(universal_handler), group=0)
# El dispatcher legado se mantiene para callbacks no manejados por el motor de flujos (prioridad 1)
# Nota: La lógica de paso ahora está dentro del universal_handler
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, universal_handler), group=0)
application.add_handler(MessageHandler(filters.VOICE, universal_handler), group=0)
application.add_handler(MessageHandler(filters.Document.ALL, universal_handler), group=0)
application.add_handler(CallbackQueryHandler(button_dispatcher))
logger.info("Iniciando Talía Bot...")
application.run_polling()

View File

@@ -25,13 +25,9 @@ class FlowEngine:
try:
with open(file_path, 'r', encoding='utf-8') as f:
flow_data = json.load(f)
# Asignar un rol basado en el prefijo del nombre del archivo, si existe
if filename.startswith('admin_'):
flow_data['role'] = 'admin'
elif filename.startswith('crew_'):
flow_data['role'] = 'crew'
elif filename.startswith('client_'):
flow_data['role'] = 'client'
if 'role' not in flow_data:
logger.warning(f"Flow {filename} is missing a 'role' key. Skipping.")
continue
loaded_flows.append(flow_data)
except json.JSONDecodeError:
logger.error(f"Error decoding JSON from {filename}.")
@@ -47,10 +43,7 @@ class FlowEngine:
def get_flow(self, flow_id):
"""Retrieves a specific flow by its ID."""
for flow in self.flows:
if flow['id'] == flow_id:
return flow
return None
return next((flow for flow in self.flows if flow.get('id') == flow_id), None)
def get_conversation_state(self, user_id):
"""Gets the current conversation state for a user from the database."""
@@ -70,7 +63,8 @@ class FlowEngine:
def start_flow(self, user_id, flow_id):
"""Starts a new flow for a user."""
flow = self.get_flow(flow_id)
if not flow:
if not flow or 'steps' not in flow or not flow['steps']:
logger.error(f"Flow '{flow_id}' is invalid or has no steps.")
return None
initial_step = flow['steps'][0]
@@ -91,7 +85,6 @@ class FlowEngine:
def handle_response(self, user_id, response_data):
"""
Handles a user's response, saves the data, and returns the next action.
Returns a dictionary with the status and the next step or final data.
"""
state = self.get_conversation_state(user_id)
if not state:
@@ -106,40 +99,26 @@ class FlowEngine:
self.end_flow(user_id)
return {"status": "error", "message": "Current step not found in flow."}
# Save the user's response using the meaningful variable name
if 'variable' in current_step:
variable_name = current_step['variable']
# Save the user's response using the 'variable' key from the step definition
variable_name = current_step.get('variable')
if variable_name:
state['collected_data'][variable_name] = response_data
else:
logger.warning(f"Step {current_step['step_id']} in flow {flow['id']} has no 'variable' defined.")
# Fallback for steps without a 'variable' key
logger.warning(f"Step {current_step['step_id']} in flow {flow['id']} has no 'variable' defined. Saving with default key.")
state['collected_data'][f"step_{current_step['step_id']}_response"] = response_data
next_step_id = state['current_step_id'] + 1
next_step = next((step for step in flow['steps'] if step['step_id'] == next_step_id), None)
if next_step:
# Check if the next step is a resolution step, which ends the data collection
if next_step.get('input_type', '').startswith('resolution_'):
logger.info(f"Flow {state['flow_id']} reached resolution for user {user_id}.")
self.end_flow(user_id)
return {
"status": "complete",
"resolution": next_step,
"data": state['collected_data']
}
else:
# It's a regular step, so update state and return it
self.update_conversation_state(user_id, state['flow_id'], next_step_id, state['collected_data'])
return {"status": "in_progress", "step": next_step}
self.update_conversation_state(user_id, state['flow_id'], next_step_id, state['collected_data'])
return {"status": "in_progress", "step": next_step}
else:
# No more steps, the flow is complete
logger.info(f"Flow {state['flow_id']} ended for user {user_id}. Data: {state['collected_data']}")
self.end_flow(user_id)
return {
"status": "complete",
"resolution": None,
"data": state['collected_data']
}
return {"status": "complete", "flow_id": flow['id'], "data": state['collected_data']}
def end_flow(self, user_id):
"""Ends a flow for a user by deleting their conversation state."""

View File

@@ -1,79 +0,0 @@
# talia_bot/modules/imap_listener.py
import imaplib
import email
import json
import logging
from email.header import decode_header
from talia_bot.config import IMAP_SERVER, IMAP_USER, IMAP_PASSWORD
logger = logging.getLogger(__name__)
def check_for_confirmation(job_id: str):
"""
Checks for a print confirmation email via IMAP.
Returns the parsed data from the email subject if a confirmation is found, else None.
"""
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASSWORD]):
logger.error("IMAP settings are not fully configured.")
return None
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(IMAP_USER, IMAP_PASSWORD)
mail.select("inbox")
# Buscar correos no leídos del remitente específico
status, messages = mail.search(None, '(UNSEEN FROM "noreply@print.epsonconnect.com")')
if status != "OK":
logger.error("Failed to search for emails.")
mail.logout()
return None
for num in messages[0].split():
status, data = mail.fetch(num, "(RFC822)")
if status != "OK":
continue
msg = email.message_from_bytes(data[0][1])
# Decodificar el asunto del correo
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else "utf-8")
# Buscar la línea que contiene el asunto original
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
break
else:
body = msg.get_payload(decode=True).decode()
for line in body.splitlines():
if line.strip().startswith("Subject:"):
original_subject = line.strip()[len("Subject:"):].strip()
# El asunto está encapsulado en `DATA:{...}`
if original_subject.startswith("DATA:"):
try:
json_data_str = original_subject[len("DATA:"):].strip()
job_data = json.loads(json_data_str)
if job_data.get("job_id") == job_id:
logger.info(f"Confirmation found for job_id: {job_id}")
# Marcar el correo como leído
mail.store(num, '+FLAGS', '\\Seen')
mail.logout()
return job_data
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse job data from subject: {original_subject}. Error: {e}")
continue
mail.logout()
return None
except Exception as e:
logger.error(f"Failed to check email via IMAP: {e}")
return None

View File

@@ -2,67 +2,33 @@
# Este script se encarga de la comunicación con la inteligencia artificial de OpenAI.
import openai
import json
import logging
from talia_bot.config import OPENAI_API_KEY, OPENAI_MODEL
logger = logging.getLogger(__name__)
def get_smart_response(prompt):
"""
Genera una respuesta inteligente usando la API de OpenAI.
async def get_smart_response(prompt: str, system_message: str = "Eres un asistente útil.") -> str:
"""
Genera una respuesta inteligente usando la API de OpenAI de forma asíncrona.
Parámetros:
- prompt: El texto o pregunta que le enviamos a la IA.
"""
# Verificamos que tengamos la llave de la API configurada
if not OPENAI_API_KEY:
logger.error("OPENAI_API_KEY no está configurada.")
return "Error: La llave de la API de OpenAI no está configurada."
try:
client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
# Creamos el cliente de OpenAI
client = openai.OpenAI(api_key=OPENAI_API_KEY)
response = await client.chat.completions.create(
# Solicitamos una respuesta al modelo configurado
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": system_message},
{"role": "system", "content": "Eres un asistente útil."},
{"role": "user", "content": prompt},
],
)
# Devolvemos el contenido de la respuesta limpia (sin espacios extras)
return response.choices[0].message.content.strip()
except Exception as e:
logger.error(f"Ocurrió un error al comunicarse con OpenAI: {e}")
# Si algo sale mal, devolvemos el error
return f"Ocurrió un error al comunicarse con OpenAI: {e}"
async def analyze_client_pitch(pitch: str, display_name: str) -> str:
"""
Analiza el pitch de un cliente contra una lista de servicios y genera una respuesta de ventas.
"""
try:
with open('talia_bot/data/services.json', 'r', encoding='utf-8') as f:
services = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.error(f"Error al cargar o decodificar services.json: {e}")
return "Lo siento, estoy teniendo problemas para acceder a nuestra lista de servicios en este momento."
services_description = "\n".join([f"- {s['service_name']}: {s['description']}" for s in services])
system_message = f"""
Eres Talia, la asistente personal de {display_name}. Tu objetivo es actuar como un filtro de ventas inteligente.
Analiza la necesidad del cliente y compárala con la lista de servicios que ofrece {display_name}.
Tu respuesta debe seguir estas reglas estrictamente:
1. Identifica cuál de los servicios de la lista es el más adecuado para la necesidad del cliente.
2. Confirma que el proyecto del cliente es interesante y encaja perfectamente con el servicio que identificaste. Menciona el nombre del servicio.
3. Cierra la conversación de manera profesional y tranquilizadora, indicando que ya has pasado el expediente a {display_name} y que él lo revisará personalmente.
4. Sé concisa, profesional y amable. No hagas preguntas, solo proporciona la respuesta de cierre.
"""
prompt = f"""
**Servicios Ofrecidos:**
{services_description}
**Necesidad del Cliente:**
"{pitch}"
**Tu Tarea:**
Genera la respuesta de cierre ideal siguiendo las reglas del system prompt.
"""
return await get_smart_response(prompt, system_message)

View File

@@ -1,64 +0,0 @@
# talia_bot/modules/mailer.py
import smtplib
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import logging
import asyncio
from talia_bot.config import (
SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASSWORD,
IMAP_USER, PRINTER_EMAIL
)
logger = logging.getLogger(__name__)
async def send_email_with_attachment(file_content: bytes, filename: str, subject: str):
"""
Sends an email with an attachment using SMTP.
Adapts connection method based on SMTP_PORT.
"""
if not all([SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASSWORD, PRINTER_EMAIL]):
logger.error("SMTP settings are not fully configured.")
return False
message = MIMEMultipart()
message["From"] = IMAP_USER
message["To"] = PRINTER_EMAIL
message["Subject"] = subject
part = MIMEBase("application", "octet-stream")
part.set_payload(file_content)
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename= {filename}",
)
message.attach(part)
text = message.as_string()
try:
context = ssl.create_default_context()
def _send_mail():
if SMTP_PORT == 465:
# Use SMTP_SSL for port 465
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as server:
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(IMAP_USER, PRINTER_EMAIL, text)
else:
# Use STARTTLS for other ports like 587
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls(context=context)
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(IMAP_USER, PRINTER_EMAIL, text)
logger.info(f"Email sent to {PRINTER_EMAIL} for printing.")
await asyncio.to_thread(_send_mail)
return True
except Exception as e:
logger.error(f"Failed to send email: {e}")
return False

View File

@@ -1,37 +0,0 @@
# talia_bot/modules/transcription.py
import logging
import openai
from talia_bot.config import OPENAI_API_KEY
logger = logging.getLogger(__name__)
async def transcribe_audio(audio_file) -> str | None:
"""
Transcribes an audio file using OpenAI's Whisper model with the modern API call.
Args:
audio_file: A file-like object containing the audio data with a 'name' attribute.
Returns:
The transcribed text as a string, or None if transcription fails.
"""
if not OPENAI_API_KEY:
logger.error("Cannot transcribe audio: OPENAI_API_KEY is not configured.")
return None
try:
client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
transcription = await client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
logger.info("Successfully transcribed audio.")
return transcription.text
except openai.APIError as e:
logger.error(f"OpenAI API error during transcription: {e}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred during transcription: {e}")
return None

View File

@@ -1,14 +1,27 @@
# talia_bot/modules/vikunja.py
# Este módulo maneja la integración con Vikunja para la gestión de proyectos y tareas.
# app/modules/vikunja.py
# Este módulo maneja la integración con Vikunja para la gestión de tareas.
import requests
import logging
import httpx
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import (
ConversationHandler,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
filters,
ContextTypes,
)
from talia_bot.config import VIKUNJA_API_URL, VIKUNJA_API_TOKEN
from config import VIKUNJA_API_URL, VIKUNJA_API_TOKEN
from permissions import is_admin
# Configuración del logger
logger = logging.getLogger(__name__)
# Definición de los estados de la conversación para añadir y editar tareas
SELECTING_ACTION, ADDING_TASK, SELECTING_TASK_TO_EDIT, EDITING_TASK = range(4)
def get_vikunja_headers():
"""Devuelve los headers necesarios para la API de Vikunja."""
return {
@@ -16,121 +29,154 @@ def get_vikunja_headers():
"Content-Type": "application/json",
}
async def get_projects():
def get_tasks():
"""
Obtiene la lista de proyectos de Vikunja de forma asíncrona.
Devuelve una lista de diccionarios de proyectos o None si hay un error.
Obtiene y formatea la lista de tareas de Vikunja.
Esta función es síncrona y devuelve un string.
"""
if not VIKUNJA_API_TOKEN:
logger.error("VIKUNJA_API_TOKEN no está configurado.")
return None
return "Error: VIKUNJA_API_TOKEN no configurado."
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{VIKUNJA_API_URL}/projects", headers=get_vikunja_headers())
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"Error de HTTP al obtener proyectos de Vikunja: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
logger.error(f"Error al obtener proyectos de Vikunja: {e}")
return None
try:
response = requests.get(f"{VIKUNJA_API_URL}/projects/1/tasks", headers=get_vikunja_headers())
response.raise_for_status()
tasks = response.json()
async def get_project_tasks(project_id: int):
"""
Obtiene las tareas de un proyecto específico de forma asíncrona.
"""
if not VIKUNJA_API_TOKEN:
logger.error("VIKUNJA_API_TOKEN no está configurado.")
return None
if not tasks:
return "No tienes tareas pendientes en Vikunja."
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{VIKUNJA_API_URL}/projects/{project_id}/tasks", headers=get_vikunja_headers())
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"Error de HTTP al obtener tareas del proyecto {project_id}: {e.response.status_code}")
return None
except Exception as e:
logger.error(f"Error al obtener tareas del proyecto {project_id}: {e}")
return None
text = "📋 *Tus Tareas en Vikunja*\n\n"
for task in sorted(tasks, key=lambda t: t.get('id', 0))[:10]:
status = "" if task.get('done') else ""
text += f"{status} `{task.get('id')}`: *{task.get('title')}*\n"
return text
except Exception as e:
logger.error(f"Error al obtener tareas de Vikunja: {e}")
return f"Error al conectar con Vikunja: {e}"
async def add_comment_to_task(task_id: int, comment: str):
"""
Añade un comentario a una tarea específica.
"""
if not VIKUNJA_API_TOKEN:
logger.error("VIKUNJA_API_TOKEN no está configurado.")
return False
async def vikunja_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Muestra el menú principal de acciones de Vikunja."""
query = update.callback_query
await query.answer()
async with httpx.AsyncClient() as client:
try:
data = {"comment": comment}
response = await client.post(f"{VIKUNJA_API_URL}/tasks/{task_id}/comments", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
logger.info(f"Comentario añadido a la tarea {task_id}.")
return True
except httpx.HTTPStatusError as e:
logger.error(f"Error de HTTP al añadir comentario a la tarea {task_id}: {e.response.status_code}")
return False
except Exception as e:
logger.error(f"Error al añadir comentario a la tarea {task_id}: {e}")
return False
keyboard = [
[InlineKeyboardButton("Añadir Tarea", callback_data='add_task')],
[InlineKeyboardButton("Editar Tarea", callback_data='edit_task_start')],
[InlineKeyboardButton("Volver", callback_data='cancel')],
]
reply_markup = InlineKeyboardMarkup(keyboard)
async def update_task_status(task_id: int, is_done: bool = None, status_text: str = None):
"""
Actualiza una tarea en Vikunja.
- Si `is_done` es un booleano, actualiza el estado de completado.
- Si `status_text` es un string, añade un comentario con ese estado.
"""
if not VIKUNJA_API_TOKEN:
logger.error("VIKUNJA_API_TOKEN no está configurado.")
return False
tasks_list = get_tasks()
await query.edit_message_text(text=f"{tasks_list}\n\nSelecciona una acción:", reply_markup=reply_markup, parse_mode='Markdown')
return SELECTING_ACTION
async with httpx.AsyncClient() as client:
try:
if is_done is not None:
data = {"done": is_done}
response = await client.put(f"{VIKUNJA_API_URL}/tasks/{task_id}", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
logger.info(f"Estado de la tarea {task_id} actualizado a {'completado' if is_done else 'pendiente'}.")
return True
async def request_task_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Solicita al usuario el título de la nueva tarea."""
query = update.callback_query
await query.answer()
await query.edit_message_text("Por favor, introduce el título de la nueva tarea:")
return ADDING_TASK
if status_text:
return await add_comment_to_task(task_id, f"Nuevo estatus: {status_text}")
async def add_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Añade una nueva tarea a Vikunja."""
task_title = update.message.text
try:
data = {"title": task_title, "project_id": 1}
response = requests.post(f"{VIKUNJA_API_URL}/tasks", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
await update.message.reply_text(f"✅ Tarea añadida: *{task_title}*", parse_mode='Markdown')
except Exception as e:
logger.error(f"Error al añadir tarea a Vikunja: {e}")
await update.message.reply_text(f"Error al añadir tarea: {e}")
except httpx.HTTPStatusError as e:
logger.error(f"Error de HTTP al actualizar la tarea {task_id}: {e.response.status_code}")
return False
except Exception as e:
logger.error(f"Error al actualizar la tarea {task_id}: {e}")
return False
return False
return ConversationHandler.END
async def create_task(project_id: int, title: str, due_date: str = None):
"""
Crea una nueva tarea en un proyecto específico.
"""
if not VIKUNJA_API_TOKEN:
logger.error("VIKUNJA_API_TOKEN no está configurado.")
return None
async def select_task_to_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Muestra los botones para seleccionar qué tarea editar."""
query = update.callback_query
await query.answer()
async with httpx.AsyncClient() as client:
try:
data = {"project_id": project_id, "title": title}
if due_date:
data["due_date"] = due_date
try:
response = requests.get(f"{VIKUNJA_API_URL}/projects/1/tasks", headers=get_vikunja_headers())
response.raise_for_status()
tasks = [task for task in response.json() if not task.get('done')]
response = await client.post(f"{VIKUNJA_API_URL}/tasks", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
task = response.json()
logger.info(f"Tarea '{title}' creada en el proyecto {project_id}.")
return task
except httpx.HTTPStatusError as e:
logger.error(f"Error de HTTP al crear la tarea: {e.response.status_code}")
return None
except Exception as e:
logger.error(f"Error al crear la tarea: {e}")
return None
if not tasks:
await query.edit_message_text("No hay tareas pendientes para editar.")
return ConversationHandler.END
keyboard = []
for task in sorted(tasks, key=lambda t: t.get('id', 0))[:10]:
keyboard.append([InlineKeyboardButton(
f"{task.get('id')}: {task.get('title')}",
callback_data=f"edit_task:{task.get('id')}"
)])
keyboard.append([InlineKeyboardButton("Cancelar", callback_data='cancel')])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Selecciona la tarea que quieres editar:", reply_markup=reply_markup)
return SELECTING_TASK_TO_EDIT
except Exception as e:
logger.error(f"Error al obtener tareas para editar: {e}")
await query.edit_message_text("Error al obtener la lista de tareas.")
return ConversationHandler.END
async def request_new_task_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Solicita el nuevo título para la tarea seleccionada."""
query = update.callback_query
await query.answer()
task_id = query.data.split(':')[1]
context.user_data['task_id_to_edit'] = task_id
await query.edit_message_text(f"Introduce el nuevo título para la tarea `{task_id}`:", parse_mode='Markdown')
return EDITING_TASK
async def edit_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Actualiza el título de una tarea en Vikunja."""
new_title = update.message.text
task_id = context.user_data.get('task_id_to_edit')
if not task_id:
await update.message.reply_text("Error: No se encontró el ID de la tarea a editar.")
return ConversationHandler.END
try:
data = {"title": new_title}
response = requests.put(f"{VIKUNJA_API_URL}/tasks/{task_id}", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
await update.message.reply_text(f"✅ Tarea `{task_id}` actualizada a *{new_title}*", parse_mode='Markdown')
except Exception as e:
logger.error(f"Error al editar la tarea {task_id}: {e}")
await update.message.reply_text("Error al actualizar la tarea.")
finally:
del context.user_data['task_id_to_edit']
return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Cancela la conversación actual."""
query = update.callback_query
await query.answer()
await query.edit_message_text("Operación cancelada.")
return ConversationHandler.END
def vikunja_conv_handler():
"""Crea el ConversationHandler para el flujo de Vikunja."""
return ConversationHandler(
entry_points=[CallbackQueryHandler(vikunja_menu, pattern='^manage_vikunja$')],
states={
SELECTING_ACTION: [
CallbackQueryHandler(request_task_title, pattern='^add_task$'),
CallbackQueryHandler(select_task_to_edit, pattern='^edit_task_start$'),
CallbackQueryHandler(cancel, pattern='^cancel$'),
],
ADDING_TASK: [MessageHandler(filters.TEXT & ~filters.COMMAND, add_task)],
SELECTING_TASK_TO_EDIT: [
CallbackQueryHandler(request_new_task_title, pattern=r'^edit_task:\d+$'),
CallbackQueryHandler(cancel, pattern='^cancel$'),
],
EDITING_TASK: [MessageHandler(filters.TEXT & ~filters.COMMAND, edit_task)],
},
fallbacks=[CommandHandler('cancel', cancel)],
)