From 338108d7b7408d91c6c8cfb5170c0ecd6aa8004b Mon Sep 17 00:00:00 2001 From: Marco Gallegos Date: Sat, 20 Dec 2025 10:31:17 -0600 Subject: [PATCH] =?UTF-8?q?feat:=20Implementar=20finalizaci=C3=B3n=20del?= =?UTF-8?q?=20flujo=20/horario?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Este commit introduce la lógica para procesar y guardar los datos recopilados por el flujo de conversación . Cambios: 1. **Nueva tabla de base de datos**: Se ha añadido una nueva tabla a la base de datos para almacenar los horarios de trabajo de los usuarios. 2. **Nuevo modelo SQLAlchemy**: Se ha creado el modelo en . 3. **Nuevo **: Se ha creado un nuevo módulo para centralizar la lógica de finalización de los flujos de conversación. * La función determina qué acción tomar en función del flujo que ha finalizado. * La función se encarga de: * Enviar los datos del horario al . * Guardar (o actualizar si ya existe) el horario en la nueva tabla . 4. **Actualización de **: El constructor de flujos ahora llama a cuando una conversación llega a su fin, conectando la lógica de conversación con la de procesamiento de datos. --- db/init/init.sql | 19 ++++ models/vanity_attendance_models.py | 21 +++++ modules/finalizer.py | 137 +++++++++++++++++++++++++++++ modules/flow_builder.py | 69 ++++++++------- 4 files changed, 212 insertions(+), 34 deletions(-) create mode 100644 modules/finalizer.py diff --git a/db/init/init.sql b/db/init/init.sql index 444c571..7e4e720 100644 --- a/db/init/init.sql +++ b/db/init/init.sql @@ -130,3 +130,22 @@ CREATE TABLE IF NOT EXISTS horario_empleadas ( hora_salida_teorica TIME, FOREIGN KEY (numero_empleado) REFERENCES vanity_hr.data_empleadas(numero_empleado) ); + +CREATE TABLE IF NOT EXISTS horarios_configurados ( + id INT AUTO_INCREMENT PRIMARY KEY, + telegram_id BIGINT NOT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + short_name VARCHAR(100), + monday_in TIME, + monday_out TIME, + tuesday_in TIME, + tuesday_out TIME, + wednesday_in TIME, + wednesday_out TIME, + thursday_in TIME, + thursday_out TIME, + friday_in TIME, + friday_out TIME, + saturday_in TIME, + saturday_out TIME +); diff --git a/models/vanity_attendance_models.py b/models/vanity_attendance_models.py index 050295f..9fca8a6 100644 --- a/models/vanity_attendance_models.py +++ b/models/vanity_attendance_models.py @@ -30,3 +30,24 @@ class HorarioEmpleadas(Base): hora_entrada_teorica = Column(Time) hora_salida_teorica = Column(Time) empleada = relationship("DataEmpleadas", backref="horario_empleadas") + +class HorariosConfigurados(Base): + __tablename__ = 'horarios_configurados' + __table_args__ = {'schema': 'vanity_attendance'} + + id = Column(Integer, primary_key=True, autoincrement=True) + telegram_id = Column(BigInteger, nullable=False) + timestamp = Column(Time) + short_name = Column(String(100)) + monday_in = Column(Time) + monday_out = Column(Time) + tuesday_in = Column(Time) + tuesday_out = Column(Time) + wednesday_in = Column(Time) + wednesday_out = Column(Time) + thursday_in = Column(Time) + thursday_out = Column(Time) + friday_in = Column(Time) + friday_out = Column(Time) + saturday_in = Column(Time) + saturday_out = Column(Time) diff --git a/modules/finalizer.py b/modules/finalizer.py new file mode 100644 index 0000000..affcb8a --- /dev/null +++ b/modules/finalizer.py @@ -0,0 +1,137 @@ +import os +import json +import logging +import requests +from datetime import datetime +from sqlalchemy.orm import sessionmaker +from modules.database import get_engine +from models.vanity_attendance_models import HorariosConfigurados + +def _send_webhook(url: str, payload: dict): + """Sends a POST request to a webhook.""" + if not url: + logging.warning("No webhook URL provided.") + return False + try: + headers = {"Content-Type": "application/json"} + res = requests.post(url, json=payload, headers=headers, timeout=20) + res.raise_for_status() + logging.info(f"Webhook sent successfully to: {url}") + return True + except Exception as e: + logging.error(f"Error sending webhook to {url}: {e}") + return False + +def _convert_to_time(time_str: str): + """Converts a string like '10:00 AM' to a datetime.time object.""" + if not time_str or not isinstance(time_str, str): + return None + try: + # Handle 'Todo el día' or other non-time strings + if ":" not in time_str: + return None + return datetime.strptime(time_str, '%I:%M %p').time() + except ValueError: + logging.warning(f"Could not parse time string: {time_str}") + return None + +def _finalize_horario(telegram_id: int, data: dict): + """Finalizes the 'horario' flow.""" + logging.info(f"Finalizing 'horario' flow for telegram_id: {telegram_id}") + + # 1. Prepare data + schedule_data = { + "telegram_id": telegram_id, + "short_name": data.get("SHORT_NAME"), + "monday_in": _convert_to_time(data.get("MONDAY_IN")), + "monday_out": _convert_to_time(data.get("MONDAY_OUT")), + "tuesday_in": _convert_to_time(data.get("TUESDAY_IN")), + "tuesday_out": _convert_to_time(data.get("TUESDAY_OUT")), + "wednesday_in": _convert_to_time(data.get("WEDNESDAY_IN")), + "wednesday_out": _convert_to_time(data.get("WEDNESDAY_OUT")), + "thursday_in": _convert_to_time(data.get("THURSDAY_IN")), + "thursday_out": _convert_to_time(data.get("THURSDAY_OUT")), + "friday_in": _convert_to_time(data.get("FRIDAY_IN")), + "friday_out": _convert_to_time(data.get("FRIDAY_OUT")), + "saturday_in": _convert_to_time(data.get("SATURDAY_IN")), + "saturday_out": _convert_to_time("6:00 PM"), # Hardcoded as per flow + } + + # 2. Send to webhook + webhook_url = os.getenv("WEBHOOK_SCHEDULE") + if webhook_url: + # Create a JSON-serializable payload + json_payload = {k: (v.isoformat() if isinstance(v, datetime.time) else v) for k, v in schedule_data.items()} + json_payload["timestamp"] = datetime.now().isoformat() + _send_webhook(webhook_url, json_payload) + + # 3. Save to database + engine = get_engine() + Session = sessionmaker(bind=engine) + session = Session() + try: + # Upsert logic: Check if a record for this telegram_id already exists + existing_schedule = session.query(HorariosConfigurados).filter_by(telegram_id=telegram_id).first() + if existing_schedule: + # Update existing record + for key, value in schedule_data.items(): + setattr(existing_schedule, key, value) + existing_schedule.timestamp = datetime.now() + logging.info(f"Updating existing schedule for telegram_id: {telegram_id}") + else: + # Create new record + new_schedule = HorariosConfigurados(**schedule_data) + session.add(new_schedule) + logging.info(f"Creating new schedule for telegram_id: {telegram_id}") + + session.commit() + return True + except Exception as e: + logging.error(f"Database error in _finalize_horario: {e}") + session.rollback() + return False + finally: + session.close() + + +# Mapping of flow names to finalization functions +FINALIZATION_MAP = { + "horario": _finalize_horario, + # Add other flows here, e.g., "onboarding": _finalize_onboarding +} + + +async def finalize_flow(update, context): + """Generic function to finalize a conversation flow.""" + flow_name = context.user_data.get("flow_name") + telegram_id = update.effective_user.id + + if not flow_name: + logging.error("finalize_flow called without a flow_name in user_data.") + return + + finalizer_func = FINALIZATION_MAP.get(flow_name) + if not finalizer_func: + logging.warning(f"No finalizer function found for flow: {flow_name}") + await update.message.reply_text("Flujo completado (sin acción final definida).") + return + + # The final answer needs to be saved first + current_state_key = context.user_data.get("current_state") + if current_state_key: + flow_definition_path = os.path.join("conv-flows", f"{flow_name}.json") + with open(flow_definition_path, 'r') as f: + flow = json.load(f) + current_step = next((step for step in flow["steps"] if step["state"] == current_state_key), None) + if current_step: + variable_name = current_step.get("variable") + if variable_name: + context.user_data[variable_name] = update.message.text + + + success = finalizer_func(telegram_id, context.user_data) + + if success: + await update.message.reply_text("¡Horario guardado con éxito! 👍") + else: + await update.message.reply_text("Ocurrió un error al guardar tu horario. Por favor, contacta a un administrador.") diff --git a/modules/flow_builder.py b/modules/flow_builder.py index 3fefb2f..27f41da 100644 --- a/modules/flow_builder.py +++ b/modules/flow_builder.py @@ -11,72 +11,62 @@ from telegram.ext import ( filters, ) -# Assuming finalization logic will be handled elsewhere for now -# from .onboarding import finalizar, cancelar +from .finalizer import finalize_flow -# A simple end state for now -async def end(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await update.message.reply_text("Flow ended.") +async def end_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await update.message.reply_text("Flujo cancelado.") return ConversationHandler.END async def generic_callback(update: Update, context: ContextTypes.DEFAULT_TYPE, flow: dict): current_state_key = context.user_data.get("current_state", 0) - # Find the current step in the flow current_step = next((step for step in flow["steps"] if step["state"] == current_state_key), None) if not current_step: await update.message.reply_text("Hubo un error en el flujo. Por favor, inicia de nuevo.") return ConversationHandler.END - # Save the answer user_answer = update.message.text variable_name = current_step.get("variable") if variable_name: context.user_data[variable_name] = user_answer - # Determine the next state next_state_key = None if "next_steps" in current_step: for condition in current_step["next_steps"]: - if condition["value"] == user_answer: + if condition.get("value") == user_answer: next_state_key = condition["go_to"] break - elif condition["value"] == "default": + elif condition.get("value") == "default": next_state_key = condition["go_to"] elif "next_step" in current_step: next_state_key = current_step["next_step"] if next_state_key is None: - # If no next step is defined, end the conversation - return await end(update, context) + return await end_cancel(update, context) + + if next_state_key == -1: + await finalize_flow(update, context) + return ConversationHandler.END - # Find the next step next_step = next((step for step in flow["steps"] if step["state"] == next_state_key), None) if not next_step: - # If the next step is the end of the conversation - if next_state_key == -1: - # Here we would call the generic "finalizar" function - # For now, just end it - await update.message.reply_text("Has completado el flujo. ¡Gracias!") - # return await finalizar(update, context) - return ConversationHandler.END - else: - await update.message.reply_text("Error: No se encontró el siguiente paso del flujo.") - return ConversationHandler.END + await update.message.reply_text("Error: No se encontró el siguiente paso del flujo.") + return ConversationHandler.END - # Ask the next question reply_markup = ReplyKeyboardRemove() if next_step.get("type") == "keyboard" and "options" in next_step: + # Create a 2D array for the keyboard + options = next_step["options"] + keyboard = [options[i:i+2] for i in range(0, len(options), 2)] reply_markup = ReplyKeyboardMarkup( - [next_step["options"][i:i+3] for i in range(0, len(next_step["options"]), 3)], + keyboard, one_time_keyboard=True, resize_keyboard=True ) await update.message.reply_text(next_step["question"], reply_markup=reply_markup) - # Update the current state context.user_data["current_state"] = next_state_key return next_state_key @@ -85,14 +75,15 @@ async def start_flow(update: Update, context: ContextTypes.DEFAULT_TYPE, flow: d context.user_data.clear() context.user_data["flow_name"] = flow["flow_name"] - # Start with the first step first_step = flow["steps"][0] context.user_data["current_state"] = first_step["state"] reply_markup = ReplyKeyboardRemove() if first_step.get("type") == "keyboard" and "options" in first_step: + options = first_step["options"] + keyboard = [options[i:i+2] for i in range(0, len(options), 2)] reply_markup = ReplyKeyboardMarkup( - [first_step["options"][i:i+3] for i in range(0, len(first_step["options"]), 3)], + keyboard, one_time_keyboard=True, resize_keyboard=True ) @@ -102,25 +93,35 @@ async def start_flow(update: Update, context: ContextTypes.DEFAULT_TYPE, flow: d def create_handler(flow: dict): states = {} - for step in flow["steps"]: + all_states = sorted(list(set([step["state"] for step in flow["steps"]]))) + + for state_key in all_states: + # Skip the end state + if state_key == -1: + continue + callback = partial(generic_callback, flow=flow) - states[step["state"]] = [MessageHandler(filters.TEXT & ~filters.COMMAND, callback)] + states[state_key] = [MessageHandler(filters.TEXT & ~filters.COMMAND, callback)] - # The entry point should be a command with the same name as the flow entry_point = CommandHandler(flow["flow_name"], partial(start_flow, flow=flow)) return ConversationHandler( entry_points=[entry_point], states=states, - fallbacks=[CommandHandler("cancelar", end)], # Replace with generic cancel + fallbacks=[CommandHandler("cancelar", end_cancel)], allow_reentry=True, ) def load_flows(): flow_handlers = [] - for filename in os.listdir("conv-flows"): + flow_dir = "conv-flows" + if not os.path.isdir(flow_dir): + logging.warning(f"Directory not found: {flow_dir}") + return flow_handlers + + for filename in os.listdir(flow_dir): if filename.endswith(".json"): - filepath = os.path.join("conv-flows", filename) + filepath = os.path.join(flow_dir, filename) with open(filepath, "r", encoding="utf-8") as f: try: flow_definition = json.load(f)