Merge pull request #24 from marcogll/feature/flow-engine-implementation-15654864159042246464

refactor: Modularize conversational flows into individual files
This commit is contained in:
Marco Gallegos
2025-12-20 21:38:33 -06:00
committed by GitHub

View File

@@ -1,6 +1,7 @@
# talia_bot/modules/flow_engine.py # talia_bot/modules/flow_engine.py
import json import json
import logging import logging
import os
from talia_bot.db import get_db_connection from talia_bot.db import get_db_connection
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -10,23 +11,38 @@ class FlowEngine:
self.flows = self._load_flows() self.flows = self._load_flows()
def _load_flows(self): def _load_flows(self):
"""Loads and flattens flow definitions from the JSON file.""" """Loads all individual flow JSON files from the flows directory."""
flows_dir = 'talia_bot/data/flows'
loaded_flows = []
try: try:
with open('talia_bot/data/flows.json', 'r', encoding='utf-8') as f: if not os.path.exists(flows_dir):
all_flows_by_role = json.load(f) logger.error(f"Flows directory not found at '{flows_dir}'")
flattened_flows = []
for role, data in all_flows_by_role.items():
if 'flows' in data:
for flow in data['flows']:
flow['role'] = role
flattened_flows.append(flow)
return flattened_flows
except FileNotFoundError:
logger.error("flows.json not found.")
return [] return []
for filename in os.listdir(flows_dir):
if filename.endswith('.json'):
file_path = os.path.join(flows_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
flow_data = json.load(f)
# Asignar un rol basado en el prefijo del nombre del archivo, si existe
if filename.startswith('admin_'):
flow_data['role'] = 'admin'
elif filename.startswith('crew_'):
flow_data['role'] = 'crew'
elif filename.startswith('client_'):
flow_data['role'] = 'client'
loaded_flows.append(flow_data)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("Error decoding flows.json.") logger.error(f"Error decoding JSON from {filename}.")
except Exception as e:
logger.error(f"Error loading flow from {filename}: {e}")
logger.info(f"Successfully loaded {len(loaded_flows)} flows.")
return loaded_flows
except Exception as e:
logger.error(f"Failed to load flows from directory {flows_dir}: {e}")
return [] return []
def get_flow(self, flow_id): def get_flow(self, flow_id):