From b0e720965369d3c183ec6edbe52c8d83ec2b0e4a Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 20 Dec 2025 22:55:50 +0000 Subject: [PATCH] feat: implement JSON-driven conversational flow engine Replaces hardcoded ConversationHandlers with a generic flow engine that reads conversation definitions from talia_bot/data/flows.json. - Adds a 'conversations' table to the database to persist user state, making flows robust against restarts. - Implements a central 'universal_handler' in main.py to process all user inputs (text, voice, callbacks, documents) through the new engine. - Refactors Vikunja, LLM, and Calendar modules to be asynchronous and support the new architecture. - Adds a new 'transcription' module for OpenAI Whisper and a 'mailer' module for the print flow. - Implements the full logic for all specified user flows, including project/task management, calendar blocking, idea capture (with branching logic), and the RAG-based client sales funnel. - Cleans up legacy code and handlers. --- requirements.txt | 1 + talia_bot/config.py | 5 +- talia_bot/db.py | 14 +- talia_bot/main.py | 359 ++++++++++++++++++++++++++--- talia_bot/modules/flow_engine.py | 134 +++++++++++ talia_bot/modules/llm_engine.py | 60 +++-- talia_bot/modules/mailer.py | 56 +++++ talia_bot/modules/transcription.py | 37 +++ talia_bot/modules/vikunja.py | 260 +++++++++------------ 9 files changed, 725 insertions(+), 201 deletions(-) create mode 100644 talia_bot/modules/flow_engine.py create mode 100644 talia_bot/modules/mailer.py create mode 100644 talia_bot/modules/transcription.py diff --git a/requirements.txt b/requirements.txt index dcd523e..32db824 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ google-auth-oauthlib openai pytz python-dotenv +python-dateutil diff --git a/talia_bot/config.py b/talia_bot/config.py index 6056ad0..5b2a18d 100644 --- a/talia_bot/config.py +++ b/talia_bot/config.py @@ -29,8 +29,9 @@ N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL") N8N_TEST_WEBHOOK_URL = os.getenv("N8N_TEST_WEBHOOK_URL") # Configuración de Vikunja -VIKUNJA_API_URL = os.getenv("VIKUNJA_API_URL", "https://tasks.soul23.cloud/api/v1") -VIKUNJA_API_TOKEN = os.getenv("VIKUNJA_API_TOKEN") +VIKUNJA_API_URL = os.getenv("VIKUNJA_BASE_URL") +VIKUNJA_API_TOKEN = os.getenv("VIKUNJA_TOKEN") +VIKUNJA_INBOX_PROJECT_ID = os.getenv("VIKUNJA_INBOX_PROJECT_ID") # Llave de la API de OpenAI para usar modelos de lenguaje (como GPT) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") diff --git a/talia_bot/db.py b/talia_bot/db.py index a4fe64c..3072622 100644 --- a/talia_bot/db.py +++ b/talia_bot/db.py @@ -32,8 +32,20 @@ def setup_database(): ) """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS conversations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + flow_id TEXT NOT NULL, + current_step_id INTEGER NOT NULL, + collected_data TEXT, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users (telegram_id) + ) + """) + conn.commit() - logger.info("Database setup complete. 'users' table is ready.") + logger.info("Database setup complete. 'users' and 'conversations' tables are ready.") except sqlite3.Error as e: logger.error(f"Database error during setup: {e}") finally: diff --git a/talia_bot/main.py b/talia_bot/main.py index 8a453aa..d7faed1 100644 --- a/talia_bot/main.py +++ b/talia_bot/main.py @@ -34,9 +34,17 @@ from talia_bot.modules.aprobaciones import view_pending, handle_approval_action from talia_bot.modules.servicios import get_service_info from talia_bot.modules.admin import get_system_status from talia_bot.modules.debug import print_handler -from talia_bot.modules.create_tag import create_tag_conv_handler -from talia_bot.modules.vikunja import vikunja_conv_handler +import json +from telegram import InlineKeyboardButton, InlineKeyboardMarkup +import io +from talia_bot.modules.vikunja import get_projects, add_comment_to_task, update_task_status, get_project_tasks, create_task from talia_bot.db import setup_database +from talia_bot.modules.flow_engine import FlowEngine +from talia_bot.modules.transcription import transcribe_audio +from talia_bot.modules.llm_engine import analyze_client_pitch +from talia_bot.modules.calendar import create_event +from talia_bot.modules.mailer import send_email_with_attachment +from talia_bot.config import ADMIN_ID, VIKUNJA_INBOX_PROJECT_ID from talia_bot.scheduler import schedule_daily_summary @@ -46,6 +54,304 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +# Instanciamos el motor de flujos +flow_engine = FlowEngine() + +async def send_step_message(update: Update, context: ContextTypes.DEFAULT_TYPE, step: dict, collected_data: dict = None): + """ + Envía el mensaje de un paso del flujo, construyendo el teclado dinámicamente. + """ + keyboard = [] + input_type = step.get("input_type") + collected_data = collected_data or {} + + if input_type == "keyboard" and "options" in step: + for option in step["options"]: + keyboard.append([InlineKeyboardButton(option, callback_data=option)]) + elif input_type == "dynamic_keyboard_vikunja": + projects = await get_projects() + if projects: + for project in projects: + keyboard.append([InlineKeyboardButton(project['title'], callback_data=f"project_{project['id']}")]) + else: + await update.effective_message.reply_text("No se pudieron cargar los proyectos de Vikunja.") + return + elif input_type == "dynamic_keyboard_vikunja_tasks": + project_id_str = collected_data.get('PROJECT_SELECT', '').split('_')[-1] + if project_id_str.isdigit(): + project_id = int(project_id_str) + tasks = await get_project_tasks(project_id) + if tasks: + for task in tasks: + keyboard.append([InlineKeyboardButton(task['title'], callback_data=f"task_{task['id']}")]) + else: + await update.effective_message.reply_text("Este proyecto no tiene tareas. Puedes añadir una o seleccionar otro proyecto.") + # Aquí podríamos opcionalmente terminar el flujo o devolver al paso anterior. + return + else: + await update.effective_message.reply_text("Error: No se pudo identificar el proyecto para buscar tareas.") + return + + reply_markup = InlineKeyboardMarkup(keyboard) if keyboard else None + + # Si la actualización es de un botón, edita el mensaje. Si no, envía uno nuevo. + if update.callback_query: + await update.callback_query.edit_message_text( + text=step["question"], reply_markup=reply_markup, parse_mode='Markdown' + ) + else: + await update.message.reply_text( + text=step["question"], reply_markup=reply_markup, parse_mode='Markdown' + ) + +async def universal_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """ + Handler universal que gestiona todos los flujos de conversación. + """ + user_id = update.effective_user.id + user_role = get_user_role(user_id) + + state = flow_engine.get_conversation_state(user_id) + + if state: + response_data = None + if update.callback_query: + response_data = update.callback_query.data + await update.callback_query.answer() + elif update.message and update.message.text: + response_data = update.message.text + elif update.message and update.message.voice: + voice_file = await update.message.voice.get_file() + file_buffer = io.BytesIO() + await voice_file.download_to_memory(file_buffer) + file_buffer.seek(0) + file_buffer.name = "voice_message.oga" + + await update.message.reply_text("Transcribiendo audio... ⏳") + response_data = await transcribe_audio(file_buffer) + if response_data is None: + await update.message.reply_text("Lo siento, no pude entender el audio. ¿Podrías intentarlo de nuevo?") + return + elif update.message and update.message.document: + # Guardamos la información del archivo para el paso de resolución + response_data = { + "file_id": update.message.document.file_id, + "file_name": update.message.document.file_name, + } + + if response_data: + result = flow_engine.handle_response(user_id, response_data) + + if result.get("status") == "in_progress": + # Pasamos los datos recolectados para que el siguiente paso los pueda usar si es necesario + current_state = flow_engine.get_conversation_state(user_id) + await send_step_message(update, context, result["step"], current_state.get("collected_data")) + elif result.get("status") == "complete": + await handle_flow_resolution(update, context, result) + elif result.get("status") == "error": + await update.effective_message.reply_text(f"Error: {result.get('message', 'Ocurrió un error.')}") + return + + trigger = None + is_callback = False + if update.callback_query: + trigger = update.callback_query.data + is_callback = True + await update.callback_query.answer() + elif update.message and update.message.text: + trigger = update.message.text + + # Flujo automático para clientes + if not trigger and user_role == 'client' and not state: + flow_to_start = next((f for f in flow_engine.flows if f.get("trigger_automatic")), None) + if flow_to_start: + logger.info(f"Starting automatic flow '{flow_to_start['id']}' for client {user_id}") + initial_step = flow_engine.start_flow(user_id, flow_to_start['id']) + if initial_step: + await send_step_message(update, context, initial_step) + return + + if trigger: + for flow in flow_engine.flows: + if trigger == flow.get('trigger_button') or trigger == flow.get('trigger_command'): + logger.info(f"Starting flow '{flow['id']}' for user {user_id} via trigger '{trigger}'") + initial_step = flow_engine.start_flow(user_id, flow['id']) + if initial_step: + await send_step_message(update, context, initial_step) + return + + # Si ninguna acción de flujo se disparó y es un callback, podría ser una acción del menú principal + if is_callback: + logger.info(f"Callback '{trigger}' no fue manejado por el motor de flujos. Pasando al dispatcher legado.") + await button_dispatcher(update, context) + + +async def handle_flow_resolution(update: Update, context: ContextTypes.DEFAULT_TYPE, result: dict): + """ + Maneja la acción final de un flujo completado. + """ + resolution_step = result.get("resolution") + collected_data = result.get("data", {}) + + if not resolution_step: + logger.info(f"Flujo completado sin paso de resolución. Datos: {collected_data}") + final_message = "Proceso completado. ✅" + if update.callback_query: + await update.callback_query.edit_message_text(final_message) + else: + await update.effective_message.reply_text(final_message) + return + + resolution_type = resolution_step.get("input_type") + final_message = resolution_step.get("question", "Hecho. ✅") + + logger.info(f"Resolviendo flujo con tipo '{resolution_type}' y datos: {collected_data}") + + # Lógica de resolución + if resolution_type == "resolution_api_success": + action = collected_data.get("ACTION_TYPE") + task_id_str = collected_data.get("TASK_SELECT", "").split('_')[-1] + update_content = collected_data.get("UPDATE_CONTENT") + + if task_id_str.isdigit(): + task_id = int(task_id_str) + if action == "💬 Agregar Comentario": + await add_comment_to_task(task_id=task_id, comment=update_content) + elif action == "🔄 Actualizar Estatus": + await update_task_status(task_id=task_id, status_text=update_content) + elif action == "✅ Marcar Completado": + await update_task_status(task_id=task_id, is_done=True) + + elif resolution_type == "resolution_notify_admin": + admin_id = context.bot_data.get("ADMIN_ID", ADMIN_ID) # Obtener ADMIN_ID de config + if admin_id: + user_info = ( + f"✨ **Nueva Solicitud de Onboarding** ✨\n\n" + f"Un nuevo candidato ha completado el formulario:\n\n" + f"👤 **Nombre:** {collected_data.get('ONBOARD_START', 'N/A')}\n" + f"🏢 **Base:** {collected_data.get('ONBOARD_ORIGIN', 'N/A')}\n" + f"📧 **Email:** {collected_data.get('ONBOARD_EMAIL', 'N/A')}\n" + f"📱 **Teléfono:** {collected_data.get('ONBOARD_PHONE', 'N/A')}\n\n" + f"Por favor, revisa y añade al usuario al sistema si es aprobado." + ) + await context.bot.send_message(chat_id=admin_id, text=user_info, parse_mode='Markdown') + + elif resolution_type == "rag_analysis_resolution": + pitch = collected_data.get("IDEA_PITCH") + display_name = update.effective_user.full_name + final_message = await analyze_client_pitch(pitch, display_name) + + elif resolution_type == "resolution_event_created": + from dateutil.parser import parse + from datetime import datetime, timedelta + + date_str = collected_data.get("BLOCK_DATE", "Hoy") + time_str = collected_data.get("BLOCK_TIME", "") + title = collected_data.get("BLOCK_TITLE", "Bloqueado por Talia") + + try: + # Interpretar la fecha + if date_str.lower() == 'hoy': + start_date = datetime.now() + elif date_str.lower() == 'mañana': + start_date = datetime.now() + timedelta(days=1) + else: + start_date = parse(date_str) + + # Interpretar el rango de tiempo + time_parts = [part.strip() for part in time_str.replace('a', '-').split('-')] + start_time_obj = parse(time_parts[0]) + end_time_obj = parse(time_parts[1]) + + start_time = start_date.replace(hour=start_time_obj.hour, minute=start_time_obj.minute, second=0, microsecond=0) + end_time = start_date.replace(hour=end_time_obj.hour, minute=end_time_obj.minute, second=0, microsecond=0) + + except (ValueError, IndexError): + final_message = "❌ Formato de fecha u hora no reconocido. Por favor, usa algo como 'Hoy', 'Mañana', o '10am - 11am'." + if update.callback_query: + await update.callback_query.edit_message_text(final_message) + else: + await update.effective_message.reply_text(final_message) + return + + event = await asyncio.to_thread( + create_event, + summary=title, + start_time=start_time, + end_time=end_time, + attendees=[] # Añadir asistentes si fuera necesario + ) + if not event: + final_message = "❌ Hubo un error al crear el evento en el calendario." + + elif resolution_type == "resolution_saved": + idea_action = collected_data.get("IDEA_ACTION") + idea_content = collected_data.get('IDEA_CONTENT', 'N/A') + + if idea_action == "✅ Crear Tarea": + if VIKUNJA_INBOX_PROJECT_ID: + new_task = await create_task( + project_id=int(VIKUNJA_INBOX_PROJECT_ID), + title=idea_content + ) + if new_task: + final_message = "Tarea creada exitosamente en tu bandeja de entrada de Vikunja." + else: + final_message = "❌ Hubo un error al crear la tarea en Vikunja." + else: + final_message = "❌ Error: El ID del proyecto de bandeja de entrada de Vikunja no está configurado." + + elif idea_action == "📓 Guardar Nota": + admin_id = ADMIN_ID + idea_category = collected_data.get('IDEA_CATEGORY', 'N/A') + message = ( + f"🧠 **Nueva Idea Capturada (Guardada como Nota)** 🧠\n\n" + f"**Categoría:** {idea_category}\n\n" + f"**Contenido:**\n{idea_content}" + ) + await context.bot.send_message(chat_id=admin_id, text=message, parse_mode='Markdown') + + elif resolution_type == "resolution_email_sent": + file_info = collected_data.get("UPLOAD_FILE") + if isinstance(file_info, dict): + file_id = file_info.get("file_id") + file_name = file_info.get("file_name") + + if file_id and file_name: + file_obj = await context.bot.get_file(file_id) + file_buffer = io.BytesIO() + await file_obj.download_to_memory(file_buffer) + file_buffer.seek(0) + + success = await send_email_with_attachment( + file_content=file_buffer.getvalue(), + filename=file_name, + subject=f"Print Job: {file_name}" + ) + if not success: + final_message = "❌ Hubo un error al enviar el archivo a la impresora." + else: + final_message = "❌ No se encontró la información del archivo." + else: + final_message = "❌ Error en el formato de los datos del archivo." + + elif resolution_type == "system_output_nfc": + # Lógica para devolver un JSON con los datos para el tag NFC + nfc_data = { + "name": collected_data.get("WIZARD_START"), + "employee_id": collected_data.get("NUM_EMP"), + "branch": collected_data.get("SUCURSAL"), + "telegram_id": collected_data.get("TELEGRAM_ID"), + } + final_message = f"```json\n{json.dumps(nfc_data, indent=2)}\n```" + + # Enviar el mensaje de confirmación final + if update.callback_query: + await update.callback_query.edit_message_text(final_message) + else: + await update.effective_message.reply_text(final_message) + + async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """ Se ejecuta cuando el usuario escribe /start. @@ -56,20 +362,17 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.info(f"Usuario {chat_id} inició conversación con el rol: {user_role}") - # Obtenemos el texto y los botones de bienvenida desde el módulo de onboarding response_text, reply_markup = onboarding_handle_start(user_role) - # Respondemos al usuario - await update.message.reply_text(response_text, reply_markup=reply_markup) + await update.message.reply_text(response_text, reply_markup=reply_markup, parse_mode='Markdown') async def button_dispatcher(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """ - Esta función maneja los clics en los botones del menú. - Dependiendo de qué botón se presione, ejecuta una acción diferente. + Dispatcher legado para manejar botones que no inician flujos. """ query = update.callback_query - await query.answer() - logger.info(f"El despachador recibió una consulta: {query.data}") + # No se necesita await query.answer() aquí porque ya se llamó en universal_handler + logger.info(f"Dispatcher legado manejando consulta: {query.data}") response_text = "Acción no reconocida." reply_markup = None @@ -91,34 +394,32 @@ async def button_dispatcher(update: Update, context: ContextTypes.DEFAULT_TYPE) try: if query.data in simple_handlers: handler = simple_handlers[query.data] - logger.info(f"Ejecutando simple_handler para: {query.data}") if asyncio.iscoroutinefunction(handler): response_text = await handler() else: response_text = handler() elif query.data in complex_handlers: handler = complex_handlers[query.data] - logger.info(f"Ejecutando complex_handler para: {query.data}") if asyncio.iscoroutinefunction(handler): response_text, reply_markup = await handler() else: response_text, reply_markup = handler() elif query.data.startswith(('approve:', 'reject:')): - logger.info(f"Ejecutando acción de aprobación: {query.data}") response_text = handle_approval_action(query.data) elif query.data == 'start_create_tag': response_text = "Para crear un tag, por favor usa el comando /create_tag." else: - logger.warning(f"Consulta no manejada por el despachador: {query.data}") - await query.edit_message_text(text=response_text) + # Si llega aquí, es una acción que ni el motor ni el dispatcher conocen. + await query.edit_message_text(text=f"Lo siento, la acción '{query.data}' no se reconoce.") return except Exception as exc: - logger.exception(f"Error al procesar la acción {query.data}: {exc}") - response_text = "❌ Ocurrió un error al procesar tu solicitud. Intenta de nuevo." + logger.exception(f"Error al procesar la acción {query.data} en el dispatcher legado: {exc}") + response_text = "❌ Ocurrió un error al procesar tu solicitud." reply_markup = None await query.edit_message_text(text=response_text, reply_markup=reply_markup, parse_mode='Markdown') + def main() -> None: """Función principal que arranca el bot.""" if not TELEGRAM_BOT_TOKEN: @@ -130,25 +431,19 @@ def main() -> None: application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() schedule_daily_summary(application) - # El orden de los handlers es crucial para que las conversaciones funcionen. - application.add_handler(create_tag_conv_handler()) - application.add_handler(vikunja_conv_handler()) - - conv_handler = ConversationHandler( - entry_points=[CallbackQueryHandler(propose_activity_start, pattern='^propose_activity$')], - states={ - DESCRIPTION: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_description)], - DURATION: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_duration)], - }, - fallbacks=[CommandHandler('cancel', cancel_proposal)], - per_message=False - ) - application.add_handler(conv_handler) - + # Handlers principales application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("print", print_handler)) - application.add_handler(CallbackQueryHandler(button_dispatcher)) + # El handler universal para flujos (prioridad 0) + application.add_handler(CallbackQueryHandler(universal_handler), group=0) + + # El dispatcher legado se mantiene para callbacks no manejados por el motor de flujos (prioridad 1) + # Nota: La lógica de paso ahora está dentro del universal_handler + application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, universal_handler), group=0) + application.add_handler(MessageHandler(filters.VOICE, universal_handler), group=0) + application.add_handler(MessageHandler(filters.Document.ALL, universal_handler), group=0) + logger.info("Iniciando Talía Bot...") application.run_polling() diff --git a/talia_bot/modules/flow_engine.py b/talia_bot/modules/flow_engine.py new file mode 100644 index 0000000..f67a3c9 --- /dev/null +++ b/talia_bot/modules/flow_engine.py @@ -0,0 +1,134 @@ +# talia_bot/modules/flow_engine.py +import json +import logging +from talia_bot.db import get_db_connection + +logger = logging.getLogger(__name__) + +class FlowEngine: + def __init__(self): + self.flows = self._load_flows() + + def _load_flows(self): + """Loads and flattens flow definitions from the JSON file.""" + try: + with open('talia_bot/data/flows.json', 'r', encoding='utf-8') as f: + all_flows_by_role = json.load(f) + + flattened_flows = [] + for role, data in all_flows_by_role.items(): + if 'flows' in data: + for flow in data['flows']: + flow['role'] = role + flattened_flows.append(flow) + return flattened_flows + except FileNotFoundError: + logger.error("flows.json not found.") + return [] + except json.JSONDecodeError: + logger.error("Error decoding flows.json.") + return [] + + def get_flow(self, flow_id): + """Retrieves a specific flow by its ID.""" + for flow in self.flows: + if flow['id'] == flow_id: + return flow + return None + + def get_conversation_state(self, user_id): + """Gets the current conversation state for a user from the database.""" + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute("SELECT flow_id, current_step_id, collected_data FROM conversations WHERE user_id = ?", (user_id,)) + state = cursor.fetchone() + conn.close() + if state: + return { + "flow_id": state['flow_id'], + "current_step_id": state['current_step_id'], + "collected_data": json.loads(state['collected_data']) if state['collected_data'] else {} + } + return None + + def start_flow(self, user_id, flow_id): + """Starts a new flow for a user.""" + flow = self.get_flow(flow_id) + if not flow: + return None + + initial_step = flow['steps'][0] + self.update_conversation_state(user_id, flow_id, initial_step['step_id'], {}) + return initial_step + + def update_conversation_state(self, user_id, flow_id, step_id, collected_data): + """Creates or updates the conversation state in the database.""" + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute(""" + INSERT OR REPLACE INTO conversations (user_id, flow_id, current_step_id, collected_data) + VALUES (?, ?, ?, ?) + """, (user_id, flow_id, step_id, json.dumps(collected_data))) + conn.commit() + conn.close() + + def handle_response(self, user_id, response_data): + """ + Handles a user's response, saves the data, and returns the next action. + Returns a dictionary with the status and the next step or final data. + """ + state = self.get_conversation_state(user_id) + if not state: + return {"status": "error", "message": "No conversation state found."} + + flow = self.get_flow(state['flow_id']) + if not flow: + return {"status": "error", "message": f"Flow '{state['flow_id']}' not found."} + + current_step = next((step for step in flow['steps'] if step['step_id'] == state['current_step_id']), None) + if not current_step: + self.end_flow(user_id) + return {"status": "error", "message": "Current step not found in flow."} + + # Save the user's response using the meaningful variable name + if 'variable' in current_step: + variable_name = current_step['variable'] + state['collected_data'][variable_name] = response_data + else: + logger.warning(f"Step {current_step['step_id']} in flow {flow['id']} has no 'variable' defined.") + state['collected_data'][f"step_{current_step['step_id']}_response"] = response_data + + next_step_id = state['current_step_id'] + 1 + next_step = next((step for step in flow['steps'] if step['step_id'] == next_step_id), None) + + if next_step: + # Check if the next step is a resolution step, which ends the data collection + if next_step.get('input_type', '').startswith('resolution_'): + logger.info(f"Flow {state['flow_id']} reached resolution for user {user_id}.") + self.end_flow(user_id) + return { + "status": "complete", + "resolution": next_step, + "data": state['collected_data'] + } + else: + # It's a regular step, so update state and return it + self.update_conversation_state(user_id, state['flow_id'], next_step_id, state['collected_data']) + return {"status": "in_progress", "step": next_step} + else: + # No more steps, the flow is complete + logger.info(f"Flow {state['flow_id']} ended for user {user_id}. Data: {state['collected_data']}") + self.end_flow(user_id) + return { + "status": "complete", + "resolution": None, + "data": state['collected_data'] + } + + def end_flow(self, user_id): + """Ends a flow for a user by deleting their conversation state.""" + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute("DELETE FROM conversations WHERE user_id = ?", (user_id,)) + conn.commit() + conn.close() diff --git a/talia_bot/modules/llm_engine.py b/talia_bot/modules/llm_engine.py index c146d2b..d1bb1ae 100644 --- a/talia_bot/modules/llm_engine.py +++ b/talia_bot/modules/llm_engine.py @@ -2,33 +2,67 @@ # Este script se encarga de la comunicación con la inteligencia artificial de OpenAI. import openai +import json +import logging from talia_bot.config import OPENAI_API_KEY, OPENAI_MODEL -def get_smart_response(prompt): +logger = logging.getLogger(__name__) + +async def get_smart_response(prompt: str, system_message: str = "Eres un asistente útil.") -> str: """ - Genera una respuesta inteligente usando la API de OpenAI. - - Parámetros: - - prompt: El texto o pregunta que le enviamos a la IA. + Genera una respuesta inteligente usando la API de OpenAI de forma asíncrona. """ - # Verificamos que tengamos la llave de la API configurada if not OPENAI_API_KEY: + logger.error("OPENAI_API_KEY no está configurada.") return "Error: La llave de la API de OpenAI no está configurada." try: - # Creamos el cliente de OpenAI - client = openai.OpenAI(api_key=OPENAI_API_KEY) + client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY) - # Solicitamos una respuesta al modelo configurado - response = client.chat.completions.create( + response = await client.chat.completions.create( model=OPENAI_MODEL, messages=[ - {"role": "system", "content": "Eres un asistente útil."}, + {"role": "system", "content": system_message}, {"role": "user", "content": prompt}, ], ) - # Devolvemos el contenido de la respuesta limpia (sin espacios extras) return response.choices[0].message.content.strip() except Exception as e: - # Si algo sale mal, devolvemos el error + logger.error(f"Ocurrió un error al comunicarse con OpenAI: {e}") return f"Ocurrió un error al comunicarse con OpenAI: {e}" + +async def analyze_client_pitch(pitch: str, display_name: str) -> str: + """ + Analiza el pitch de un cliente contra una lista de servicios y genera una respuesta de ventas. + """ + try: + with open('talia_bot/data/services.json', 'r', encoding='utf-8') as f: + services = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.error(f"Error al cargar o decodificar services.json: {e}") + return "Lo siento, estoy teniendo problemas para acceder a nuestra lista de servicios en este momento." + + services_description = "\n".join([f"- {s['service_name']}: {s['description']}" for s in services]) + + system_message = f""" + Eres Talia, la asistente personal de {display_name}. Tu objetivo es actuar como un filtro de ventas inteligente. + Analiza la necesidad del cliente y compárala con la lista de servicios que ofrece {display_name}. + Tu respuesta debe seguir estas reglas estrictamente: + 1. Identifica cuál de los servicios de la lista es el más adecuado para la necesidad del cliente. + 2. Confirma que el proyecto del cliente es interesante y encaja perfectamente con el servicio que identificaste. Menciona el nombre del servicio. + 3. Cierra la conversación de manera profesional y tranquilizadora, indicando que ya has pasado el expediente a {display_name} y que él lo revisará personalmente. + 4. Sé concisa, profesional y amable. No hagas preguntas, solo proporciona la respuesta de cierre. + """ + + prompt = f""" + **Servicios Ofrecidos:** + {services_description} + + **Necesidad del Cliente:** + "{pitch}" + + **Tu Tarea:** + Genera la respuesta de cierre ideal siguiendo las reglas del system prompt. + """ + + return await get_smart_response(prompt, system_message) diff --git a/talia_bot/modules/mailer.py b/talia_bot/modules/mailer.py new file mode 100644 index 0000000..f7e388d --- /dev/null +++ b/talia_bot/modules/mailer.py @@ -0,0 +1,56 @@ +# talia_bot/modules/mailer.py +import smtplib +import ssl +from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase +from email import encoders +import logging +import asyncio + +from talia_bot.config import ( + SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASSWORD, + IMAP_USER, PRINTER_EMAIL +) + +logger = logging.getLogger(__name__) + +async def send_email_with_attachment(file_content: bytes, filename: str, subject: str): + """ + Sends an email with an attachment using SMTP. + """ + if not all([SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASSWORD, PRINTER_EMAIL]): + logger.error("SMTP settings are not fully configured.") + return False + + message = MIMEMultipart() + message["From"] = IMAP_USER + message["To"] = PRINTER_EMAIL + message["Subject"] = subject + + part = MIMEBase("application", "octet-stream") + part.set_payload(file_content) + encoders.encode_base64(part) + part.add_header( + "Content-Disposition", + f"attachment; filename= {filename}", + ) + message.attach(part) + text = message.as_string() + + try: + context = ssl.create_default_context() + + # Usamos asyncio.to_thread para correr el código síncrono de smtplib + def _send_mail(): + with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: + server.starttls(context=context) + server.login(SMTP_USER, SMTP_PASSWORD) + server.sendmail(IMAP_USER, PRINTER_EMAIL, text) + logger.info(f"Email sent to {PRINTER_EMAIL} for printing.") + + await asyncio.to_thread(_send_mail) + return True + + except Exception as e: + logger.error(f"Failed to send email: {e}") + return False diff --git a/talia_bot/modules/transcription.py b/talia_bot/modules/transcription.py new file mode 100644 index 0000000..3f46e8b --- /dev/null +++ b/talia_bot/modules/transcription.py @@ -0,0 +1,37 @@ +# talia_bot/modules/transcription.py +import logging +import openai +from talia_bot.config import OPENAI_API_KEY + +logger = logging.getLogger(__name__) + +async def transcribe_audio(audio_file) -> str | None: + """ + Transcribes an audio file using OpenAI's Whisper model with the modern API call. + + Args: + audio_file: A file-like object containing the audio data with a 'name' attribute. + + Returns: + The transcribed text as a string, or None if transcription fails. + """ + if not OPENAI_API_KEY: + logger.error("Cannot transcribe audio: OPENAI_API_KEY is not configured.") + return None + + try: + client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY) + + transcription = await client.audio.transcriptions.create( + model="whisper-1", + file=audio_file + ) + + logger.info("Successfully transcribed audio.") + return transcription.text + except openai.APIError as e: + logger.error(f"OpenAI API error during transcription: {e}") + return None + except Exception as e: + logger.error(f"An unexpected error occurred during transcription: {e}") + return None diff --git a/talia_bot/modules/vikunja.py b/talia_bot/modules/vikunja.py index 57557ca..5e4feff 100644 --- a/talia_bot/modules/vikunja.py +++ b/talia_bot/modules/vikunja.py @@ -1,27 +1,14 @@ -# app/modules/vikunja.py -# Este módulo maneja la integración con Vikunja para la gestión de tareas. +# talia_bot/modules/vikunja.py +# Este módulo maneja la integración con Vikunja para la gestión de proyectos y tareas. -import requests import logging -from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update -from telegram.ext import ( - ConversationHandler, - CommandHandler, - CallbackQueryHandler, - MessageHandler, - filters, - ContextTypes, -) +import httpx -from config import VIKUNJA_API_URL, VIKUNJA_API_TOKEN -from permissions import is_admin +from talia_bot.config import VIKUNJA_API_URL, VIKUNJA_API_TOKEN # Configuración del logger logger = logging.getLogger(__name__) -# Definición de los estados de la conversación para añadir y editar tareas -SELECTING_ACTION, ADDING_TASK, SELECTING_TASK_TO_EDIT, EDITING_TASK = range(4) - def get_vikunja_headers(): """Devuelve los headers necesarios para la API de Vikunja.""" return { @@ -29,154 +16,121 @@ def get_vikunja_headers(): "Content-Type": "application/json", } -def get_tasks(): +async def get_projects(): """ - Obtiene y formatea la lista de tareas de Vikunja. - Esta función es síncrona y devuelve un string. + Obtiene la lista de proyectos de Vikunja de forma asíncrona. + Devuelve una lista de diccionarios de proyectos o None si hay un error. """ if not VIKUNJA_API_TOKEN: - return "Error: VIKUNJA_API_TOKEN no configurado." + logger.error("VIKUNJA_API_TOKEN no está configurado.") + return None - try: - response = requests.get(f"{VIKUNJA_API_URL}/projects/1/tasks", headers=get_vikunja_headers()) - response.raise_for_status() - tasks = response.json() + async with httpx.AsyncClient() as client: + try: + response = await client.get(f"{VIKUNJA_API_URL}/projects", headers=get_vikunja_headers()) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error(f"Error de HTTP al obtener proyectos de Vikunja: {e.response.status_code} - {e.response.text}") + return None + except Exception as e: + logger.error(f"Error al obtener proyectos de Vikunja: {e}") + return None - if not tasks: - return "No tienes tareas pendientes en Vikunja." - - text = "📋 *Tus Tareas en Vikunja*\n\n" - for task in sorted(tasks, key=lambda t: t.get('id', 0))[:10]: - status = "✅" if task.get('done') else "⏳" - text += f"{status} `{task.get('id')}`: *{task.get('title')}*\n" - return text - except Exception as e: - logger.error(f"Error al obtener tareas de Vikunja: {e}") - return f"Error al conectar con Vikunja: {e}" +async def get_project_tasks(project_id: int): + """ + Obtiene las tareas de un proyecto específico de forma asíncrona. + """ + if not VIKUNJA_API_TOKEN: + logger.error("VIKUNJA_API_TOKEN no está configurado.") + return None -async def vikunja_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Muestra el menú principal de acciones de Vikunja.""" - query = update.callback_query - await query.answer() + async with httpx.AsyncClient() as client: + try: + response = await client.get(f"{VIKUNJA_API_URL}/projects/{project_id}/tasks", headers=get_vikunja_headers()) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error(f"Error de HTTP al obtener tareas del proyecto {project_id}: {e.response.status_code}") + return None + except Exception as e: + logger.error(f"Error al obtener tareas del proyecto {project_id}: {e}") + return None - keyboard = [ - [InlineKeyboardButton("Añadir Tarea", callback_data='add_task')], - [InlineKeyboardButton("Editar Tarea", callback_data='edit_task_start')], - [InlineKeyboardButton("Volver", callback_data='cancel')], - ] - reply_markup = InlineKeyboardMarkup(keyboard) +async def add_comment_to_task(task_id: int, comment: str): + """ + Añade un comentario a una tarea específica. + """ + if not VIKUNJA_API_TOKEN: + logger.error("VIKUNJA_API_TOKEN no está configurado.") + return False - tasks_list = get_tasks() - await query.edit_message_text(text=f"{tasks_list}\n\nSelecciona una acción:", reply_markup=reply_markup, parse_mode='Markdown') - return SELECTING_ACTION + async with httpx.AsyncClient() as client: + try: + data = {"comment": comment} + response = await client.post(f"{VIKUNJA_API_URL}/tasks/{task_id}/comments", headers=get_vikunja_headers(), json=data) + response.raise_for_status() + logger.info(f"Comentario añadido a la tarea {task_id}.") + return True + except httpx.HTTPStatusError as e: + logger.error(f"Error de HTTP al añadir comentario a la tarea {task_id}: {e.response.status_code}") + return False + except Exception as e: + logger.error(f"Error al añadir comentario a la tarea {task_id}: {e}") + return False -async def request_task_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Solicita al usuario el título de la nueva tarea.""" - query = update.callback_query - await query.answer() - await query.edit_message_text("Por favor, introduce el título de la nueva tarea:") - return ADDING_TASK +async def update_task_status(task_id: int, is_done: bool = None, status_text: str = None): + """ + Actualiza una tarea en Vikunja. + - Si `is_done` es un booleano, actualiza el estado de completado. + - Si `status_text` es un string, añade un comentario con ese estado. + """ + if not VIKUNJA_API_TOKEN: + logger.error("VIKUNJA_API_TOKEN no está configurado.") + return False -async def add_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Añade una nueva tarea a Vikunja.""" - task_title = update.message.text - try: - data = {"title": task_title, "project_id": 1} - response = requests.post(f"{VIKUNJA_API_URL}/tasks", headers=get_vikunja_headers(), json=data) - response.raise_for_status() - await update.message.reply_text(f"✅ Tarea añadida: *{task_title}*", parse_mode='Markdown') - except Exception as e: - logger.error(f"Error al añadir tarea a Vikunja: {e}") - await update.message.reply_text(f"Error al añadir tarea: {e}") + async with httpx.AsyncClient() as client: + try: + if is_done is not None: + data = {"done": is_done} + response = await client.put(f"{VIKUNJA_API_URL}/tasks/{task_id}", headers=get_vikunja_headers(), json=data) + response.raise_for_status() + logger.info(f"Estado de la tarea {task_id} actualizado a {'completado' if is_done else 'pendiente'}.") + return True - return ConversationHandler.END + if status_text: + return await add_comment_to_task(task_id, f"Nuevo estatus: {status_text}") -async def select_task_to_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Muestra los botones para seleccionar qué tarea editar.""" - query = update.callback_query - await query.answer() + except httpx.HTTPStatusError as e: + logger.error(f"Error de HTTP al actualizar la tarea {task_id}: {e.response.status_code}") + return False + except Exception as e: + logger.error(f"Error al actualizar la tarea {task_id}: {e}") + return False + return False - try: - response = requests.get(f"{VIKUNJA_API_URL}/projects/1/tasks", headers=get_vikunja_headers()) - response.raise_for_status() - tasks = [task for task in response.json() if not task.get('done')] +async def create_task(project_id: int, title: str, due_date: str = None): + """ + Crea una nueva tarea en un proyecto específico. + """ + if not VIKUNJA_API_TOKEN: + logger.error("VIKUNJA_API_TOKEN no está configurado.") + return None - if not tasks: - await query.edit_message_text("No hay tareas pendientes para editar.") - return ConversationHandler.END + async with httpx.AsyncClient() as client: + try: + data = {"project_id": project_id, "title": title} + if due_date: + data["due_date"] = due_date - keyboard = [] - for task in sorted(tasks, key=lambda t: t.get('id', 0))[:10]: - keyboard.append([InlineKeyboardButton( - f"{task.get('id')}: {task.get('title')}", - callback_data=f"edit_task:{task.get('id')}" - )]) - keyboard.append([InlineKeyboardButton("Cancelar", callback_data='cancel')]) - - reply_markup = InlineKeyboardMarkup(keyboard) - await query.edit_message_text("Selecciona la tarea que quieres editar:", reply_markup=reply_markup) - return SELECTING_TASK_TO_EDIT - except Exception as e: - logger.error(f"Error al obtener tareas para editar: {e}") - await query.edit_message_text("Error al obtener la lista de tareas.") - return ConversationHandler.END - -async def request_new_task_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Solicita el nuevo título para la tarea seleccionada.""" - query = update.callback_query - await query.answer() - - task_id = query.data.split(':')[1] - context.user_data['task_id_to_edit'] = task_id - - await query.edit_message_text(f"Introduce el nuevo título para la tarea `{task_id}`:", parse_mode='Markdown') - return EDITING_TASK - -async def edit_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Actualiza el título de una tarea en Vikunja.""" - new_title = update.message.text - task_id = context.user_data.get('task_id_to_edit') - - if not task_id: - await update.message.reply_text("Error: No se encontró el ID de la tarea a editar.") - return ConversationHandler.END - - try: - data = {"title": new_title} - response = requests.put(f"{VIKUNJA_API_URL}/tasks/{task_id}", headers=get_vikunja_headers(), json=data) - response.raise_for_status() - await update.message.reply_text(f"✅ Tarea `{task_id}` actualizada a *{new_title}*", parse_mode='Markdown') - except Exception as e: - logger.error(f"Error al editar la tarea {task_id}: {e}") - await update.message.reply_text("Error al actualizar la tarea.") - finally: - del context.user_data['task_id_to_edit'] - - return ConversationHandler.END - -async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Cancela la conversación actual.""" - query = update.callback_query - await query.answer() - await query.edit_message_text("Operación cancelada.") - return ConversationHandler.END - -def vikunja_conv_handler(): - """Crea el ConversationHandler para el flujo de Vikunja.""" - return ConversationHandler( - entry_points=[CallbackQueryHandler(vikunja_menu, pattern='^manage_vikunja$')], - states={ - SELECTING_ACTION: [ - CallbackQueryHandler(request_task_title, pattern='^add_task$'), - CallbackQueryHandler(select_task_to_edit, pattern='^edit_task_start$'), - CallbackQueryHandler(cancel, pattern='^cancel$'), - ], - ADDING_TASK: [MessageHandler(filters.TEXT & ~filters.COMMAND, add_task)], - SELECTING_TASK_TO_EDIT: [ - CallbackQueryHandler(request_new_task_title, pattern=r'^edit_task:\d+$'), - CallbackQueryHandler(cancel, pattern='^cancel$'), - ], - EDITING_TASK: [MessageHandler(filters.TEXT & ~filters.COMMAND, edit_task)], - }, - fallbacks=[CommandHandler('cancel', cancel)], - ) + response = await client.post(f"{VIKUNJA_API_URL}/tasks", headers=get_vikunja_headers(), json=data) + response.raise_for_status() + task = response.json() + logger.info(f"Tarea '{title}' creada en el proyecto {project_id}.") + return task + except httpx.HTTPStatusError as e: + logger.error(f"Error de HTTP al crear la tarea: {e.response.status_code}") + return None + except Exception as e: + logger.error(f"Error al crear la tarea: {e}") + return None