Merge pull request #21 from marcogll/feature/flow-engine-implementation-15654864159042246464

Feature/flow engine implementation 15654864159042246464
This commit is contained in:
Marco Gallegos
2025-12-20 17:31:51 -06:00
committed by GitHub
3 changed files with 169 additions and 85 deletions

147
README.md
View File

@@ -1,10 +1,12 @@
# 🤖 Talia Bot: Asistente Personal & Orquestador de Negocio # 🤖 Talia Bot: Asistente Personal & Orquestador de Negocio
Talia no es un simple chatbot; es un Middleware de Inteligencia Artificial alojado en un VPS que orquesta las operaciones diarias de administración, logística y ventas. Actúa como el puente central entre usuarios en Telegram y servicios críticos como Vikunja (Gestión de Proyectos), Google Calendar y Hardware de Impresión remota. Talia no es un simple chatbot; es un Middleware de Inteligencia Artificial que orquesta las operaciones diarias de administración, logística y ventas. Actúa como el puente central entre usuarios en Telegram y servicios críticos como Vikunja (Gestión de Proyectos), Google Calendar y Hardware de Impresión remota.
--- ---
## 🚀 Concepto Central: Enrutamiento por Identidad ## 🚀 Conceptos Centrales
### 1. Enrutamiento por Identidad
La característica core de Talia es su capacidad de cambiar de personalidad y permisos dinámicamente basándose en el Telegram ID del usuario: La característica core de Talia es su capacidad de cambiar de personalidad y permisos dinámicamente basándose en el Telegram ID del usuario:
@@ -14,60 +16,31 @@ La característica core de Talia es su capacidad de cambiar de personalidad y pe
| **Crew** | 👷 | Equipo Operativo | Limitado: Solicitud de agenda (validada), asignación de tareas, impresión de documentos. | | **Crew** | 👷 | Equipo Operativo | Limitado: Solicitud de agenda (validada), asignación de tareas, impresión de documentos. |
| **Cliente** | 👤 | Usuario Público | Ventas: Embudo de captación, consulta de servicios (RAG) y agendamiento comercial. | | **Cliente** | 👤 | Usuario Público | Ventas: Embudo de captación, consulta de servicios (RAG) y agendamiento comercial. |
### 2. Motor de Flujos Conversacionales
Toda la lógica de conversación del bot es impulsada por un motor de flujos genérico. En lugar de tener conversaciones codificadas, el bot interpreta definiciones de un archivo central `flows.json`.
* **`main.py`**: Contiene un `universal_handler` que captura todas las interacciones del usuario.
* **`flow_engine.py`**: Es el cerebro. Consulta el estado actual del usuario en la base de datos, lee el `flows.json` para determinar el siguiente paso y maneja la lógica de la conversación.
* **`flows.json`**: Un archivo JSON que define cada pregunta, botón y acción para todos los flujos de conversación, separados por rol. Esto permite modificar o añadir nuevas conversaciones sin cambiar el código principal.
--- ---
## 🛠️ Arquitectura Técnica ## 🛠️ Arquitectura Técnica
El sistema sigue un flujo modular: El sistema sigue un flujo modular:
1. **Input**: Telegram (Texto o Audio). 1. **Input**: Telegram (Texto, Audio, Documentos, Botones).
2. **STT**: Whisper (Conversión de Audio a Texto). 2. **Transcripción**: `transcription.py` (Whisper) convierte voz a texto.
3. **Router**: Verificación de ID contra la base de datos de usuarios. 3. **Router**: `universal_handler` en `main.py` enruta la entrada al `FlowEngine`.
4. **Cerebro (LLM)**: OpenAI (Fase 1) / Google Gemini (Fase 2). 4. **Estado**: El `FlowEngine` consulta la tabla `conversations` en la base de datos para saber si el usuario está en medio de un flujo.
5. **Tools**: 5. **Lógica**: El `FlowEngine` utiliza `flows.json` para procesar la entrada, recoger datos y determinar el siguiente paso.
* **Vikunja API**: Lectura/Escritura de tareas con filtrado de privacidad. 6. **Resolución**: Una vez que un flujo se completa, `main.py` ejecuta la acción final (la "resolución") llamando al módulo correspondiente.
* **Google Calendar API**: Gestión de tiempos y reglas de disponibilidad. 7. **Módulos de Acción (Tools)**:
* **SMTP/IMAP**: Comunicación bidireccional con impresoras. * **`vikunja.py`**: API asíncrona para leer/escribir tareas y proyectos.
* **NFC Gen**: Codificación Base64 para tags físicos. * **`calendar.py`**: API para crear eventos en Google Calendar.
* **`mailer.py`**: Envío de correos (SMTP) para el flujo de impresión.
--- * **`llm_engine.py`**: Análisis RAG para el embudo de ventas.
## 📋 Flujos de Trabajo (Features)
### 1. 👑 Gestión Admin (Proyectos & Identidad)
* **Proyectos (Vikunja)**:
* Resumen inteligente de estatus de proyectos.
* Comandos naturales: *"Marca el proyecto de web como terminado y comenta que se envió factura"*.
* **Wizard de Identidad (NFC)**:
* Flujo paso a paso para dar de alta colaboradores.
* Genera JSON de registro y String Base64 listo para escribir en Tags NFC.
* Inputs: Nombre, ID Empleado, Sucursal (Botones), Telegram ID.
### 2. 👷 Gestión Crew (Agenda & Tareas)
* **Solicitud de Tiempo (Wizard)**:
* Solicita espacios de 1 a 4 horas.
* **Reglas de Negocio**:
* No permite fechas > 3 meses a futuro.
* **Gatekeeper**: Verifica Google Calendar. Si hay evento "Privado" del Admin, rechaza automáticamente.
* **Modo Buzón (Vikunja)**:
* Crea tareas asignadas al Admin.
* **Privacidad**: Solo pueden consultar el estatus de tareas creadas por ellos mismos.
### 3. 🖨️ Sistema de Impresión Remota (Print Loop)
* Permite enviar archivos desde Telegram a la impresora física de la oficina.
* **Envío (SMTP)**: El bot envía el documento a un correo designado.
* **Tracking**: El asunto del correo lleva un hash único: `PJ:{uuid}#TID:{telegram_id}`.
* **Confirmación (IMAP Listener)**: Un proceso en background escucha la respuesta de la impresora y notifica al usuario en Telegram.
### 4. 👤 Ventas Automáticas (RAG)
* Identifica usuarios nuevos (no registrados en la DB).
* Captura datos (Lead Magnet).
* Analiza ideas de clientes usando `servicios.json` (Base de conocimiento).
* Ofrece citas de ventas mediante link de Calendly.
--- ---
@@ -75,25 +48,23 @@ El sistema sigue un flujo modular:
### Prerrequisitos ### Prerrequisitos
* Python 3.10+ * Python 3.9+
* Docker y Docker Compose (recomendado)
* Cuenta de Telegram Bot (@BotFather) * Cuenta de Telegram Bot (@BotFather)
* Instancia de Vikunja (Self-hosted) * Instancia de Vikunja (Self-hosted)
* Cuenta de Servicio Google Cloud (Calendar API) * Cuenta de Servicio Google Cloud (Calendar API)
* Servidor de Correo (SMTP/IMAP) * Servidor de Correo (SMTP/IMAP)
### 1. Clonar y Entorno Virtual ### 1. Clonar y Entorno
```bash ```bash
git clone https://github.com/marcogll/talia_bot_mg.git git clone https://github.com/marcogll/talia_bot_mg.git
cd talia_bot_mg cd talia_bot_mg
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
``` ```
### 2. Variables de Entorno (`.env`) ### 2. Variables de Entorno (`.env`)
Crea un archivo `.env` en la raíz con la siguiente estructura: Crea un archivo `.env` en la raíz del proyecto a partir de `.env.example` y rellena las siguientes variables:
```env ```env
# --- TELEGRAM & SECURITY --- # --- TELEGRAM & SECURITY ---
@@ -102,26 +73,33 @@ ADMIN_ID=tu_telegram_id
# --- AI CORE --- # --- AI CORE ---
OPENAI_API_KEY=sk-... OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-3.5-turbo
# --- INTEGRACIONES --- # --- INTEGRACIONES ---
VIKUNJA_API_URL=https://tuservidor.com/api/v1 VIKUNJA_BASE_URL=https://tu_vikunja.com/api/v1
VIKUNJA_TOKEN=tu_token_vikunja VIKUNJA_TOKEN=tu_token_vikunja
GOOGLE_CREDENTIALS_PATH=./data/credentials.json VIKUNJA_INBOX_PROJECT_ID=el_id_de_tu_proyecto_bandeja_de_entrada
GOOGLE_SERVICE_ACCOUNT_FILE=google_key.json
CALENDAR_ID=tu_id_de_google_calendar
# --- PRINT SERVICE --- # --- PRINT SERVICE ---
SMTP_SERVER=smtp.hostinger.com SMTP_SERVER=smtp.tuservidor.com
SMTP_PORT=465 SMTP_PORT=587
SMTP_USER=print.service@vanityexperience.mx SMTP_USER=tu_usuario_smtp
SMTP_PASS=tu_password_seguro SMTP_PASSWORD=tu_password_smtp
IMAP_SERVER=imap.hostinger.com IMAP_SERVER=imap.tuservidor.com
IMAP_USER=tu_usuario_imap
IMAP_PASSWORD=tu_password_imap
PRINTER_EMAIL=el_email_de_la_impresora
``` ```
### 3. Estructura de Datos ### 3. Ejecutar con Docker
Asegúrate de tener los archivos base en `talia_bot/data/`: La forma más sencilla de levantar el bot es con Docker Compose:
* `servicios.json`: Catálogo de servicios para el RAG de ventas.
* `credentials.json`: Credenciales de Google Cloud. ```bash
* `users.db`: Base de datos SQLite. docker-compose up --build
```
--- ---
@@ -130,22 +108,24 @@ Asegúrate de tener los archivos base en `talia_bot/data/`:
```text ```text
talia_bot_mg/ talia_bot_mg/
├── talia_bot/ ├── talia_bot/
│ ├── main.py # Entry Point y Router de Identidad │ ├── main.py # Entry Point y Universal Handler
│ ├── db.py # Gestión de la base de datos │ ├── db.py # Gestión de la base de datos (SQLite)
│ ├── config.py # Carga de variables de entorno │ ├── config.py # Carga de variables de entorno
│ ├── modules/ │ ├── modules/
│ │ ├── identity.py # Lógica de Roles y Permisos │ │ ├── flow_engine.py # El cerebro que procesa los flujos
│ │ ├── llm_engine.py # Cliente OpenAI/Gemini │ │ ├── vikunja.py # API Manager asíncrono para Tareas
│ │ ├── vikunja.py # API Manager para Tareas │ │ ├── calendar.py # Lógica de Google Calendar
│ │ ├── calendar.py # Google Calendar Logic & Rules │ │ ├── llm_engine.py # Cliente OpenAI (Whisper y GPT)
│ │ ├── printer.py # SMTP/IMAP Loop │ │ ├── transcription.py # Lógica de transcripción de audio
│ │ ── sales_rag.py # Lógica de Ventas y Servicios │ │ ── mailer.py # Módulo para envío de correos (SMTP)
│ │ └── ... # Otros módulos de soporte
│ └── data/ │ └── data/
│ ├── servicios.json # Base de conocimiento │ ├── flows.json # ¡IMPORTANTE! Define todas las conversaciones
│ ├── credentials.json # Credenciales de Google │ ├── services.json # Base de conocimiento para ventas
│ └── users.db # Base de datos de usuarios │ └── users.db # Base de datos de usuarios
├── .env # Tus variables de entorno (NO subir a Git)
├── .env.example # Plantilla de variables de entorno ├── .env.example # Plantilla de variables de entorno
├── requirements.txt # Dependencias ├── requirements.txt # Dependencias de Python
├── Dockerfile # Configuración del contenedor ├── Dockerfile # Configuración del contenedor
└── docker-compose.yml # Orquestador de Docker └── docker-compose.yml # Orquestador de Docker
``` ```
@@ -154,12 +134,13 @@ talia_bot_mg/
## 🗓️ Roadmap ## 🗓️ Roadmap
- [ ] Implementar Wizard de creación de Tags NFC (Base64). - [x] **Implementado el Motor de Flujos Conversacionales.**
- [ ] Conectar Loop de Impresión (SMTP/IMAP). - [x] **Integración completa de Vikunja, OpenAI y Google Calendar.**
- [ ] Implementar el loop de confirmación de impresión (IMAP Listener).
- [ ] Mejorar el parsing de fechas y horas con lenguaje natural más avanzado.
- [ ] Migrar de OpenAI a Google Gemini 1.5 Pro. - [ ] Migrar de OpenAI a Google Gemini 1.5 Pro.
- [ ] Implementar soporte para fotos en impresión.
--- ---
Desarrollado por: Marco G. Desarrollado por: Marco G.
Asistente Personalizado v1.0 Asistente Personalizado v2.0 (Arquitectura de Flujos)

View File

@@ -41,9 +41,11 @@ from talia_bot.modules.vikunja import get_projects, add_comment_to_task, update_
from talia_bot.db import setup_database from talia_bot.db import setup_database
from talia_bot.modules.flow_engine import FlowEngine from talia_bot.modules.flow_engine import FlowEngine
from talia_bot.modules.transcription import transcribe_audio from talia_bot.modules.transcription import transcribe_audio
import uuid
from talia_bot.modules.llm_engine import analyze_client_pitch from talia_bot.modules.llm_engine import analyze_client_pitch
from talia_bot.modules.calendar import create_event from talia_bot.modules.calendar import create_event
from talia_bot.modules.mailer import send_email_with_attachment from talia_bot.modules.mailer import send_email_with_attachment
from talia_bot.modules.imap_listener import check_for_confirmation
from talia_bot.config import ADMIN_ID, VIKUNJA_INBOX_PROJECT_ID from talia_bot.config import ADMIN_ID, VIKUNJA_INBOX_PROJECT_ID
from talia_bot.scheduler import schedule_daily_summary from talia_bot.scheduler import schedule_daily_summary
@@ -313,11 +315,21 @@ async def handle_flow_resolution(update: Update, context: ContextTypes.DEFAULT_T
elif resolution_type == "resolution_email_sent": elif resolution_type == "resolution_email_sent":
file_info = collected_data.get("UPLOAD_FILE") file_info = collected_data.get("UPLOAD_FILE")
user_id = update.effective_user.id
if isinstance(file_info, dict): if isinstance(file_info, dict):
file_id = file_info.get("file_id") file_id = file_info.get("file_id")
file_name = file_info.get("file_name") file_name = file_info.get("file_name")
if file_id and file_name: if file_id and file_name:
job_id = str(uuid.uuid4())
subject_data = {
"job_id": job_id,
"telegram_id": user_id,
"filename": file_name
}
subject = f"DATA:{json.dumps(subject_data)}"
file_obj = await context.bot.get_file(file_id) file_obj = await context.bot.get_file(file_id)
file_buffer = io.BytesIO() file_buffer = io.BytesIO()
await file_obj.download_to_memory(file_buffer) await file_obj.download_to_memory(file_buffer)
@@ -326,9 +338,21 @@ async def handle_flow_resolution(update: Update, context: ContextTypes.DEFAULT_T
success = await send_email_with_attachment( success = await send_email_with_attachment(
file_content=file_buffer.getvalue(), file_content=file_buffer.getvalue(),
filename=file_name, filename=file_name,
subject=f"Print Job: {file_name}" subject=subject
) )
if not success:
if success:
final_message = f"Recibido. 📨\n\nTu trabajo de impresión ha sido enviado (Job ID: {job_id}). Te notificaré cuando la impresora confirme que ha sido impreso."
# Esperar y verificar la confirmación
await asyncio.sleep(60) # Espera de 60 segundos
confirmation_data = await asyncio.to_thread(check_for_confirmation, job_id)
if confirmation_data:
await context.bot.send_message(chat_id=user_id, text=f"✅ ¡Éxito! Tu archivo '{file_name}' ha sido impreso correctamente.")
else:
await context.bot.send_message(chat_id=user_id, text=f"⚠️ El trabajo de impresión para '{file_name}' fue enviado, pero no he recibido una confirmación de la impresora. Por favor, verifica la bandeja de la impresora.")
else:
final_message = "❌ Hubo un error al enviar el archivo a la impresora." final_message = "❌ Hubo un error al enviar el archivo a la impresora."
else: else:
final_message = "❌ No se encontró la información del archivo." final_message = "❌ No se encontró la información del archivo."

View File

@@ -0,0 +1,79 @@
# talia_bot/modules/imap_listener.py
import imaplib
import email
import json
import logging
from email.header import decode_header
from talia_bot.config import IMAP_SERVER, IMAP_USER, IMAP_PASSWORD
logger = logging.getLogger(__name__)
def check_for_confirmation(job_id: str):
"""
Checks for a print confirmation email via IMAP.
Returns the parsed data from the email subject if a confirmation is found, else None.
"""
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASSWORD]):
logger.error("IMAP settings are not fully configured.")
return None
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(IMAP_USER, IMAP_PASSWORD)
mail.select("inbox")
# Buscar correos no leídos del remitente específico
status, messages = mail.search(None, '(UNSEEN FROM "noreply@print.epsonconnect.com")')
if status != "OK":
logger.error("Failed to search for emails.")
mail.logout()
return None
for num in messages[0].split():
status, data = mail.fetch(num, "(RFC822)")
if status != "OK":
continue
msg = email.message_from_bytes(data[0][1])
# Decodificar el asunto del correo
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else "utf-8")
# Buscar la línea que contiene el asunto original
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
break
else:
body = msg.get_payload(decode=True).decode()
for line in body.splitlines():
if line.strip().startswith("Subject:"):
original_subject = line.strip()[len("Subject:"):].strip()
# El asunto está encapsulado en `DATA:{...}`
if original_subject.startswith("DATA:"):
try:
json_data_str = original_subject[len("DATA:"):].strip()
job_data = json.loads(json_data_str)
if job_data.get("job_id") == job_id:
logger.info(f"Confirmation found for job_id: {job_id}")
# Marcar el correo como leído
mail.store(num, '+FLAGS', '\\Seen')
mail.logout()
return job_data
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Could not parse job data from subject: {original_subject}. Error: {e}")
continue
mail.logout()
return None
except Exception as e:
logger.error(f"Failed to check email via IMAP: {e}")
return None