feat: Complete sprint 2 tasks

- Upgrade Python to 3.11 and update dependencies.
- Refactor main.py to isolate business logic.
- Fix bugs in flow_engine.py and printer.py.
- Improve database connection handling.
- Standardize error handling.
- Verify secret management.
This commit is contained in:
google-labs-jules[bot]
2025-12-22 20:55:55 +00:00
parent 2c4aafe080
commit 4da52dd972
8 changed files with 312 additions and 269 deletions

138
bot/modules/dispatcher.py Normal file
View File

@@ -0,0 +1,138 @@
# bot/modules/dispatcher.py
# This module is responsible for dispatching user interactions to the correct handlers.
import logging
import asyncio
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ContextTypes
from bot.modules.onboarding import get_admin_secondary_menu
from bot.modules.agenda import get_agenda
from bot.modules.citas import request_appointment
from bot.modules.equipo import view_requests_status
from bot.modules.aprobaciones import view_pending, handle_approval_action
from bot.modules.admin import get_system_status
from bot.modules.vikunja import get_projects_list, get_tasks_list
logger = logging.getLogger(__name__)
async def send_step_message(update: Update, step: dict):
"""Helper to send a message for a flow step, including options if available."""
text = step["question"]
reply_markup = None
options = []
if "options" in step and step["options"]:
options = step["options"]
elif "input_type" in step:
if step["input_type"] == "dynamic_keyboard_vikunja_projects":
projects = get_projects_list()
options = [p.get('title', 'Unknown') for p in projects]
elif step["input_type"] == "dynamic_keyboard_vikunja_tasks":
tasks = get_tasks_list(1)
options = [t.get('title', 'Unknown') for t in tasks]
if options:
keyboard = []
row = []
for option in options:
cb_data = str(option)[:64]
row.append(InlineKeyboardButton(str(option), callback_data=cb_data))
if len(row) >= 2:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
reply_markup = InlineKeyboardMarkup(keyboard)
if update.callback_query:
await update.callback_query.edit_message_text(text=text, reply_markup=reply_markup)
else:
await update.message.reply_text(text=text, reply_markup=reply_markup)
async def button_dispatcher(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles button clicks and dispatches them to the appropriate handlers.
"""
query = update.callback_query
await query.answer()
logger.info(f"El despachador recibió una consulta: {query.data}")
response_text = "Acción no reconocida."
reply_markup = None
simple_handlers = {
'view_agenda': get_agenda,
'view_requests_status': view_requests_status,
'schedule_appointment': request_appointment,
'view_system_status': get_system_status,
'manage_users': lambda: "Función de gestión de usuarios no implementada.",
}
complex_handlers = {
'admin_menu': get_admin_secondary_menu,
'view_pending': view_pending,
}
try:
if query.data in simple_handlers:
handler = simple_handlers[query.data]
logger.info(f"Ejecutando simple_handler para: {query.data}")
if asyncio.iscoroutinefunction(handler):
response_text = await handler()
else:
response_text = handler()
elif query.data in complex_handlers:
handler = complex_handlers[query.data]
logger.info(f"Ejecutando complex_handler para: {query.data}")
if asyncio.iscoroutinefunction(handler):
response_text, reply_markup = await handler()
else:
response_text, reply_markup = handler()
elif query.data.startswith(('approve:', 'reject:')):
logger.info(f"Ejecutando acción de aprobación: {query.data}")
response_text = handle_approval_action(query.data)
else:
flow_engine = context.bot_data["flow_engine"]
flow_to_start = next((flow for flow in flow_engine.flows if flow.get("trigger_button") == query.data), None)
if flow_to_start:
logger.info(f"Iniciando flujo: {flow_to_start['id']}")
initial_step = flow_engine.start_flow(update.effective_user.id, flow_to_start["id"])
if initial_step:
await send_step_message(update, initial_step)
else:
logger.error("No se pudo iniciar el flujo (paso inicial vacío).")
return
state = flow_engine.get_conversation_state(update.effective_user.id)
if state:
logger.info(f"Procesando paso de flujo para usuario {update.effective_user.id}. Data: {query.data}")
result = flow_engine.handle_response(update.effective_user.id, query.data)
if result["status"] == "in_progress":
logger.info("Flujo en progreso, enviando siguiente paso.")
await send_step_message(update, result["step"])
elif result["status"] == "complete":
logger.info("Flujo completado.")
if "sales_pitch" in result:
await query.edit_message_text(result["sales_pitch"])
elif "nfc_tag" in result:
await query.edit_message_text(result["nfc_tag"], parse_mode='Markdown')
else:
await query.edit_message_text("Gracias por completar el flujo.")
elif result["status"] == "error":
logger.error(f"Error en el flujo: {result['message']}")
await query.edit_message_text(f"Error: {result['message']}")
return
logger.warning(f"Consulta no manejada por el despachador: {query.data}")
await query.edit_message_text(text=response_text)
return
except Exception as exc:
logger.exception(f"Error al procesar la acción {query.data}: {exc}")
response_text = "❌ Ocurrió un error al procesar tu solicitud. Intenta de nuevo."
reply_markup = None
await query.edit_message_text(text=response_text, reply_markup=reply_markup, parse_mode='Markdown')

View File

@@ -52,24 +52,27 @@ class FlowEngine:
def get_conversation_state(self, user_id):
"""Gets the current conversation state for a user from the database."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT flow_id, current_step_id, collected_data FROM conversations WHERE user_id = ?", (user_id,))
state = cursor.fetchone()
conn.close()
if state:
return {
"flow_id": state['flow_id'],
"current_step_id": state['current_step_id'],
"collected_data": json.loads(state['collected_data']) if state['collected_data'] else {}
}
return None
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT flow_id, current_step_id, collected_data FROM conversations WHERE user_id = ?", (user_id,))
state = cursor.fetchone()
if state:
return {
"flow_id": state['flow_id'],
"current_step_id": state['current_step_id'],
"collected_data": json.loads(state['collected_data']) if state['collected_data'] else {}
}
return None
except sqlite3.Error as e:
logger.error(f"Database error in get_conversation_state: {e}")
return None
def start_flow(self, user_id, flow_id):
"""Starts a new flow for a user."""
flow = self.get_flow(flow_id)
if not flow or 'steps' not in flow or not flow['steps']:
logger.error(f"Flow '{flow_id}' is invalid or has no steps.")
if not flow or 'steps' not in flow or not isinstance(flow['steps'], list) or not flow['steps']:
logger.error(f"Flow '{flow_id}' is invalid, has no steps, or steps is not a non-empty list.")
return None
initial_step = flow['steps'][0]
@@ -78,14 +81,16 @@ class FlowEngine:
def update_conversation_state(self, user_id, flow_id, step_id, collected_data):
"""Creates or updates the conversation state in the database."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO conversations (user_id, flow_id, current_step_id, collected_data)
VALUES (?, ?, ?, ?)
""", (user_id, flow_id, step_id, json.dumps(collected_data)))
conn.commit()
conn.close()
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO conversations (user_id, flow_id, current_step_id, collected_data)
VALUES (?, ?, ?, ?)
""", (user_id, flow_id, step_id, json.dumps(collected_data)))
conn.commit()
except sqlite3.Error as e:
logger.error(f"Database error in update_conversation_state: {e}")
def handle_response(self, user_id, response_data):
"""
@@ -147,8 +152,10 @@ class FlowEngine:
def end_flow(self, user_id):
"""Ends a flow for a user by deleting their conversation state."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM conversations WHERE user_id = ?", (user_id,))
conn.commit()
conn.close()
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM conversations WHERE user_id = ?", (user_id,))
conn.commit()
except sqlite3.Error as e:
logger.error(f"Database error in end_flow: {e}")

View File

@@ -0,0 +1,57 @@
# bot/modules/message_handler.py
# This module handles the processing of text and voice messages.
import logging
import os
from telegram import Update
from telegram.ext import ContextTypes
from bot.modules.transcription import transcribe_audio
from bot.modules.dispatcher import send_step_message
logger = logging.getLogger(__name__)
async def text_and_voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles text and voice messages for the flow engine."""
user_id = update.effective_user.id
flow_engine = context.bot_data["flow_engine"]
state = flow_engine.get_conversation_state(user_id)
if not state:
return
user_response = update.message.text
if update.message.voice:
voice = update.message.voice
temp_dir = 'temp_files'
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, f"{voice.file_id}.ogg")
try:
voice_file = await context.bot.get_file(voice.file_id)
await voice_file.download_to_drive(file_path)
logger.info(f"Voice message saved to {file_path}")
user_response = transcribe_audio(file_path)
logger.info(f"Transcription result: '{user_response}'")
except Exception as e:
logger.error(f"Error during voice transcription: {e}")
user_response = "Error al procesar el mensaje de voz."
finally:
if os.path.exists(file_path):
os.remove(file_path)
result = flow_engine.handle_response(user_id, user_response)
if result["status"] == "in_progress":
await send_step_message(update, result["step"])
elif result["status"] == "complete":
if "sales_pitch" in result:
await update.message.reply_text(result["sales_pitch"])
elif "nfc_tag" in result:
await update.message.reply_text(result["nfc_tag"], parse_mode='Markdown')
else:
await update.message.reply_text("Gracias por completar el flujo.")
elif result["status"] == "error":
await update.message.reply_text(result["message"])

View File

@@ -5,10 +5,13 @@ import smtplib
import imaplib
import email
import logging
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from telegram import Update
from telegram.ext import ContextTypes
from bot.config import (
SMTP_SERVER,
@@ -21,9 +24,36 @@ from bot.config import (
PRINTER_EMAIL,
)
from bot.modules.identity import is_admin
from bot.modules.file_validation import validate_document
logger = logging.getLogger(__name__)
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles documents sent to the bot for printing."""
document = update.message.document
user_id = update.effective_user.id
is_valid, message = validate_document(document)
if not is_valid:
await update.message.reply_text(message)
return
file = await context.bot.get_file(document.file_id)
temp_dir = 'temp_files'
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, document.file_name)
try:
await file.download_to_drive(file_path)
response = await send_file_to_printer(file_path, user_id, document.file_name)
await update.message.reply_text(response)
finally:
if os.path.exists(file_path):
os.remove(file_path)
async def send_file_to_printer(file_path: str, user_id: int, file_name: str):
"""
Sends a file to the printer via email.
@@ -93,6 +123,9 @@ async def check_print_status(user_id: int):
if not email_ids:
return "No hay actualizaciones de estado de impresión."
# Fetch only the last 10 unseen emails
email_ids = email_ids[-10:]
statuses = []
for e_id in email_ids:
_, msg_data = mail.fetch(e_id, "(RFC822)")
@@ -108,7 +141,10 @@ async def check_print_status(user_id: int):
statuses.append(f"Trabajo de impresión recibido: {msg['subject']}")
else:
statuses.append(f"Nuevo correo: {msg['subject']}")
# Mark the email as seen
mail.store(e_id, "+FLAGS", "\\Seen")
mail.close()
mail.logout()
if not statuses: