refactor: Migrate bot core and modules from talia_bot to bot directory, update start_bot.sh and Dockerfile, and modify README.md.

This commit is contained in:
Marco Gallegos
2025-12-21 18:00:31 -06:00
parent 6e1a2f0102
commit 13141d6ed3
38 changed files with 112 additions and 86 deletions

20
bot/modules/admin.py Normal file
View File

@@ -0,0 +1,20 @@
# app/modules/admin.py
# Este módulo contiene funciones administrativas para el bot.
# Por ahora, permite ver el estado general del sistema.
def get_system_status():
"""
Devuelve un mensaje con el estado actual del bot y sus conexiones.
Actualmente el mensaje es fijo (hardcoded), pero en el futuro podría
hacer pruebas reales de conexión.
"""
# TODO: Implementar pruebas de estado en tiempo real para un monitoreo exacto.
status_text = (
"📊 *Estado del Sistema*\n\n"
"- *Bot Principal:* Activo ✅\n"
"- *Conexión Telegram API:* Estable ✅\n"
"- *Integración n8n:* Operacional ✅\n"
"- *Google Calendar:* Conectado ✅"
)
return status_text

56
bot/modules/agenda.py Normal file
View File

@@ -0,0 +1,56 @@
# bot/modules/agenda.py
# Este módulo se encarga de manejar las peticiones relacionadas con la agenda.
# Permite obtener y mostrar las actividades programadas para el día.
import datetime
import logging
from bot.modules.calendar import get_events
from bot.config import WORK_GOOGLE_CALENDAR_ID, PERSONAL_GOOGLE_CALENDAR_ID
logger = logging.getLogger(__name__)
async def get_agenda():
"""
Obtiene y muestra la agenda del usuario para el día actual desde Google Calendar.
Diferencia entre eventos de trabajo (visibles) y personales (bloqueos).
"""
try:
logger.info("Obteniendo agenda...")
now = datetime.datetime.now(datetime.timezone.utc)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = start_of_day + datetime.timedelta(days=1)
logger.info(f"Buscando eventos de trabajo en {WORK_GOOGLE_CALENDAR_ID} y personales en {PERSONAL_GOOGLE_CALENDAR_ID}")
# Obtener eventos de trabajo (para mostrar)
work_events = get_events(start_of_day, end_of_day, calendar_id=WORK_GOOGLE_CALENDAR_ID)
# Obtener eventos personales (para comprobar bloqueos, no se muestran)
personal_events = get_events(start_of_day, end_of_day, calendar_id=PERSONAL_GOOGLE_CALENDAR_ID)
if not work_events and not personal_events:
logger.info("No se encontraron eventos de ningún tipo.")
return "📅 *Agenda para Hoy*\n\nTotalmente despejado. No hay eventos de trabajo ni personales."
agenda_text = "📅 *Agenda para Hoy*\n\n"
if not work_events:
agenda_text += "No tienes eventos de trabajo programados para hoy.\n"
else:
for event in work_events:
start = event["start"].get("dateTime", event["start"].get("date"))
if "T" in start:
time_str = start.split("T")[1][:5]
else:
time_str = "Todo el día"
summary = event.get("summary", "(Sin título)")
agenda_text += f"• *{time_str}* - {summary}\n"
if personal_events:
agenda_text += "\n🔒 Tienes tiempo personal bloqueado."
logger.info("Agenda obtenida con éxito.")
return agenda_text
except Exception as e:
logger.error(f"Error al obtener la agenda: {e}")
return "❌ Error al obtener la agenda. Por favor, intenta de nuevo más tarde."

View File

@@ -0,0 +1,69 @@
# app/modules/aprobaciones.py
# Este módulo gestiona el flujo de aprobación para las solicitudes hechas por el equipo.
# Permite ver solicitudes pendientes y aprobarlas o rechazarlas.
# El usuario principal aquí es el "owner" (dueño).
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
def get_approval_menu(request_id):
"""
Crea un menú de botones (teclado en línea) con "Aprobar" y "Rechazar".
Cada botón lleva el ID de la solicitud para saber cuál estamos procesando.
"""
keyboard = [
[
# callback_data es lo que el bot recibe cuando se pulsa el botón
InlineKeyboardButton("✅ Aprobar", callback_data=f'approve:{request_id}'),
InlineKeyboardButton("❌ Rechazar", callback_data=f'reject:{request_id}'),
]
]
return InlineKeyboardMarkup(keyboard)
def view_pending():
"""
Muestra al dueño una lista de solicitudes que esperan su aprobación.
Por ahora usa una lista fija de ejemplo.
"""
# TODO: Obtener solicitudes reales desde una base de datos o servicio externo.
proposals = [
{"id": "prop_001", "desc": "Grabación de proyecto", "duration": 4, "user": "Equipo A"},
{"id": "prop_002", "desc": "Taller de guion", "duration": 2, "user": "Equipo B"},
]
if not proposals:
return "No hay solicitudes pendientes.", None
# Tomamos la primera propuesta para mostrarla
proposal = proposals[0]
text = (
f"⏳ *Nueva Solicitud Pendiente*\n\n"
f"🙋‍♂️ *Solicitante:* {proposal['user']}\n"
f"📝 *Actividad:* {proposal['desc']}\n"
f"⏳ *Duración:* {proposal['duration']} horas"
)
# Adjuntamos los botones de aprobación
reply_markup = get_approval_menu(proposal['id'])
return text, reply_markup
def handle_approval_action(callback_data):
"""
Maneja la respuesta del dueño (clic en aprobar o rechazar).
Separa la acción (approve/reject) del ID de la solicitud.
"""
# callback_data viene como "accion:id", por ejemplo "approve:prop_001"
action, request_id = callback_data.split(':')
if action == 'approve':
# TODO: Guardar en base de datos que fue aprobada y avisar al equipo.
return f"✅ La solicitud *{request_id}* ha sido aprobada."
elif action == 'reject':
# TODO: Guardar en base de datos que fue rechazada y avisar al equipo.
return f"❌ La solicitud *{request_id}* ha sido rechazada."
return "Acción desconocida.", None

146
bot/modules/calendar.py Normal file
View File

@@ -0,0 +1,146 @@
# app/google_calendar.py
# Este script maneja la integración con Google Calendar (Calendario de Google).
# Permite buscar espacios libres y crear eventos.
import datetime
import logging
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from bot.config import GOOGLE_SERVICE_ACCOUNT_FILE, WORK_GOOGLE_CALENDAR_ID
logger = logging.getLogger(__name__)
# Configuración de los permisos (SCOPES) para acceder al calendario
SCOPES = ["https://www.googleapis.com/auth/calendar"]
# Autenticación usando el archivo de cuenta de servicio (Service Account)
creds = service_account.Credentials.from_service_account_file(
GOOGLE_SERVICE_ACCOUNT_FILE, scopes=SCOPES
)
# Construcción del objeto 'service' que nos permite interactuar con la API de Google Calendar
service = build("calendar", "v3", credentials=creds)
def get_available_slots(
start_time, end_time, duration_minutes=30, calendar_id=WORK_GOOGLE_CALENDAR_ID
):
"""
Busca espacios disponibles en el calendario dentro de un rango de tiempo.
Parámetros:
- start_time: Hora de inicio de la búsqueda.
- end_time: Hora de fin de la búsqueda.
- duration_minutes: Cuánto dura cada cita (por defecto 30 min).
- calendar_id: El ID del calendario donde buscar.
"""
try:
# Convertimos las fechas a formato ISO (el que entiende Google)
time_min = start_time.isoformat()
time_max = end_time.isoformat()
# Consultamos a Google qué horas están ocupadas (freebusy)
freebusy_query = {
"timeMin": time_min,
"timeMax": time_max,
"timeZone": "UTC",
"items": [{"id": calendar_id}],
}
freebusy_result = service.freebusy().query(body=freebusy_query).execute()
busy_slots = freebusy_result["calendars"][calendar_id]["busy"]
# Creamos una lista de todos los posibles espacios (slots)
potential_slots = []
current_time = start_time
while current_time + datetime.timedelta(minutes=duration_minutes) <= end_time:
potential_slots.append(
(
current_time,
current_time + datetime.timedelta(minutes=duration_minutes),
)
)
# Avanzamos el tiempo para el siguiente espacio
current_time += datetime.timedelta(minutes=duration_minutes)
# Filtramos los espacios que chocan con horas ocupadas
available_slots = []
for slot_start, slot_end in potential_slots:
is_busy = False
for busy in busy_slots:
busy_start = datetime.datetime.fromisoformat(busy["start"])
busy_end = datetime.datetime.fromisoformat(busy["end"])
# Si el espacio propuesto se cruza con uno ocupado, lo marcamos como ocupado
if max(slot_start, busy_start) < min(slot_end, busy_end):
is_busy = True
break
if not is_busy:
available_slots.append((slot_start, slot_end))
return available_slots
except HttpError as error:
print(f"Ocurrió un error con la API de Google: {error}")
return []
def create_event(summary, start_time, end_time, attendees, calendar_id=WORK_GOOGLE_CALENDAR_ID):
"""
Crea un nuevo evento (cita) en el calendario.
Parámetros:
- summary: Título del evento.
- start_time: Hora de inicio.
- end_time: Hora de fin.
- attendees: Lista de correos electrónicos de los asistentes.
"""
# Definimos la estructura del evento según pide Google
event = {
"summary": summary,
"start": {
"dateTime": start_time.isoformat(),
"timeZone": "UTC",
},
"end": {
"dateTime": end_time.isoformat(),
"timeZone": "UTC",
},
"attendees": [{"email": email} for email in attendees],
}
try:
# Insertamos el evento en el calendario
created_event = (
service.events().insert(calendarId=calendar_id, body=event).execute()
)
return created_event
except HttpError as error:
print(f"Ocurrió un error al crear el evento: {error}")
return None
def get_events(start_time, end_time, calendar_id=WORK_GOOGLE_CALENDAR_ID):
"""
Obtiene la lista de eventos entre dos momentos.
"""
try:
logger.info(f"Llamando a la API de Google Calendar para {calendar_id}")
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=start_time.isoformat(),
timeMax=end_time.isoformat(),
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
logger.info(f"Se obtuvieron {len(events)} eventos de la API.")
return events
except HttpError as error:
logger.error(f"Ocurrió un error al obtener eventos: {error}")
return []
except Exception as e:
logger.error(f"Error inesperado al obtener eventos: {e}")
return []

17
bot/modules/citas.py Normal file
View File

@@ -0,0 +1,17 @@
# app/modules/citas.py
# Este módulo maneja la programación de citas para los clientes.
# Permite a los usuarios obtener un enlace para agendar una reunión.
from bot.config import CALENDLY_LINK
def request_appointment():
"""
Proporciona al usuario un enlace para agendar una cita.
Usa el enlace configurado en las variables de entorno.
"""
response_text = (
"Para agendar una cita, por favor utiliza el siguiente enlace: \n\n"
f"[Agendar Cita]({CALENDLY_LINK})"
)
return response_text

36
bot/modules/debug.py Normal file
View File

@@ -0,0 +1,36 @@
# bot/modules/debug.py
# Este módulo permite a los administradores imprimir los detalles de configuración del bot.
# Es una herramienta útil para depuración (debugging).
from telegram import Update
from telegram.ext import ContextTypes
from bot.modules.identity import is_admin
from bot.config import (
TIMEZONE,
WORK_GOOGLE_CALENDAR_ID,
PERSONAL_GOOGLE_CALENDAR_ID,
N8N_WEBHOOK_URL,
)
async def print_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Maneja el comando /print.
Verifica si el usuario es administrador. Si lo es, muestra valores clave
de la configuración (Zona horaria, ID de calendario, Webhook).
"""
chat_id = update.effective_chat.id
# Solo permitimos esto a los administradores
if is_admin(chat_id):
config_details = (
f"**Detalles de Configuración**\n"
f"Zona Horaria: `{TIMEZONE}`\n"
f"Calendario Trabajo: `{WORK_GOOGLE_CALENDAR_ID or 'No definido'}`\n"
f"Calendario Personal: `{PERSONAL_GOOGLE_CALENDAR_ID or 'No definido'}`\n"
f"URL Webhook n8n: `{N8N_WEBHOOK_URL or 'No definido'}`\n"
)
await update.message.reply_text(config_details, parse_mode='Markdown')
else:
# Si no es admin, le avisamos que no tiene permiso
await update.message.reply_text("No tienes autorización para usar este comando.")

12
bot/modules/equipo.py Normal file
View File

@@ -0,0 +1,12 @@
# app/modules/equipo.py
# Este módulo contiene funciones para los miembros autorizados del equipo.
# Incluye un flujo para proponer actividades que el dueño debe aprobar.
def view_requests_status():
"""
Permite a un miembro del equipo ver el estado de sus solicitudes recientes.
Por ahora devuelve un estado de ejemplo fijo.
"""
# TODO: Obtener el estado real desde una base de datos.
return "Aquí está el estado de tus solicitudes recientes:\n\n- Grabación de proyecto (4h): Aprobado\n- Taller de guion (2h): Pendiente"

154
bot/modules/flow_engine.py Normal file
View File

@@ -0,0 +1,154 @@
# bot/modules/flow_engine.py
import json
import logging
import os
from bot.db import get_db_connection
from bot.modules.sales_rag import generate_sales_pitch
from bot.modules.nfc_tag import generate_nfc_tag
logger = logging.getLogger(__name__)
class FlowEngine:
def __init__(self):
self.flows = self._load_flows()
def _load_flows(self):
"""Loads all individual flow JSON files from the flows directory."""
# flows_dir = 'bot/data/flows' # OLD
base_dir = os.path.dirname(os.path.abspath(__file__))
flows_dir = os.path.join(base_dir, '..', 'data', 'flows')
loaded_flows = []
try:
if not os.path.exists(flows_dir):
logger.error(f"Flows directory not found at '{flows_dir}'")
return []
for filename in os.listdir(flows_dir):
if filename.endswith('.json'):
file_path = os.path.join(flows_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
flow_data = json.load(f)
if 'role' not in flow_data:
logger.warning(f"Flow {filename} is missing a 'role' key. Skipping.")
continue
loaded_flows.append(flow_data)
except json.JSONDecodeError:
logger.error(f"Error decoding JSON from {filename}.")
except Exception as e:
logger.error(f"Error loading flow from {filename}: {e}")
logger.info(f"Successfully loaded {len(loaded_flows)} flows.")
return loaded_flows
except Exception as e:
logger.error(f"Failed to load flows from directory {flows_dir}: {e}")
return []
def get_flow(self, flow_id):
"""Retrieves a specific flow by its ID."""
return next((flow for flow in self.flows if flow.get('id') == flow_id), None)
def get_conversation_state(self, user_id):
"""Gets the current conversation state for a user from the database."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT flow_id, current_step_id, collected_data FROM conversations WHERE user_id = ?", (user_id,))
state = cursor.fetchone()
conn.close()
if state:
return {
"flow_id": state['flow_id'],
"current_step_id": state['current_step_id'],
"collected_data": json.loads(state['collected_data']) if state['collected_data'] else {}
}
return None
def start_flow(self, user_id, flow_id):
"""Starts a new flow for a user."""
flow = self.get_flow(flow_id)
if not flow or 'steps' not in flow or not flow['steps']:
logger.error(f"Flow '{flow_id}' is invalid or has no steps.")
return None
initial_step = flow['steps'][0]
self.update_conversation_state(user_id, flow_id, initial_step['step_id'], {})
return initial_step
def update_conversation_state(self, user_id, flow_id, step_id, collected_data):
"""Creates or updates the conversation state in the database."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO conversations (user_id, flow_id, current_step_id, collected_data)
VALUES (?, ?, ?, ?)
""", (user_id, flow_id, step_id, json.dumps(collected_data)))
conn.commit()
conn.close()
def handle_response(self, user_id, response_data):
"""
Handles a user's response, saves the data, and returns the next action.
"""
state = self.get_conversation_state(user_id)
if not state:
return {"status": "error", "message": "No conversation state found."}
flow = self.get_flow(state['flow_id'])
if not flow:
return {"status": "error", "message": f"Flow '{state['flow_id']}' not found."}
current_step = next((step for step in flow['steps'] if step['step_id'] == state['current_step_id']), None)
if not current_step:
self.end_flow(user_id)
return {"status": "error", "message": "Current step not found in flow."}
# Save the user's response using the 'variable' key from the step definition
variable_name = current_step.get('variable')
if variable_name:
state['collected_data'][variable_name] = response_data
else:
# Fallback for steps without a 'variable' key
logger.warning(f"Step {current_step['step_id']} in flow {flow['id']} has no 'variable' defined. Saving with default key.")
state['collected_data'][f"step_{current_step['step_id']}_response"] = response_data
# Find the index of the current step to determine the next one robustly
steps = flow['steps']
current_step_index = -1
for i, step in enumerate(steps):
if step['step_id'] == state['current_step_id']:
current_step_index = i
break
# Check if there is a next step in the list
if current_step_index != -1 and current_step_index + 1 < len(steps):
next_step = steps[current_step_index + 1]
self.update_conversation_state(user_id, state['flow_id'], next_step['step_id'], state['collected_data'])
return {"status": "in_progress", "step": next_step}
else:
# This is the last step, so the flow is complete
final_data = state['collected_data']
self.end_flow(user_id)
response = {"status": "complete", "flow_id": flow['id'], "data": final_data}
if flow['id'] == 'client_sales_funnel':
user_query = final_data.get('IDEA_PITCH', '')
sales_pitch = generate_sales_pitch(user_query, final_data)
response['sales_pitch'] = sales_pitch
elif flow['id'] == 'admin_create_nfc_tag':
nfc_tag = generate_nfc_tag(final_data)
response['nfc_tag'] = nfc_tag
return response
def end_flow(self, user_id):
"""Ends a flow for a user by deleting their conversation state."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM conversations WHERE user_id = ?", (user_id,))
conn.commit()
conn.close()

71
bot/modules/identity.py Normal file
View File

@@ -0,0 +1,71 @@
# bot/modules/identity.py
# Este script maneja los roles y permisos de los usuarios.
import logging
from bot.db import get_db_connection
from bot.config import ADMIN_ID
logger = logging.getLogger(__name__)
def add_user(telegram_id, role, name=None, employee_id=None, branch=None):
"""
Añade un nuevo usuario o actualiza el rol de uno existente.
"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (telegram_id, role, name, employee_id, branch)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(telegram_id) DO UPDATE SET
role = excluded.role,
name = excluded.name,
employee_id = excluded.employee_id,
branch = excluded.branch
""", (telegram_id, role, name, employee_id, branch))
conn.commit()
logger.info(f"Usuario {telegram_id} añadido/actualizado con el rol {role}.")
return True
except Exception as e:
logger.error(f"Error al añadir/actualizar usuario {telegram_id}: {e}")
return False
finally:
if conn:
conn.close()
def get_user_role(telegram_id):
"""
Determina el rol de un usuario.
Roles: 'admin', 'crew', 'client'.
"""
# El admin principal se define en el .env para el primer arranque
if str(telegram_id) == ADMIN_ID:
return 'admin'
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT role FROM users WHERE telegram_id = ?", (telegram_id,))
user = cursor.fetchone()
if user:
logger.debug(f"Rol encontrado para {telegram_id}: {user['role']}")
return user['role']
else:
# Si no está en la DB, es un cliente nuevo
logger.debug(f"No se encontró rol para {telegram_id}, asignando 'client'.")
return 'client'
except Exception as e:
logger.error(f"Error al obtener el rol para {telegram_id}: {e}")
return 'client' # Fallback seguro
finally:
if conn:
conn.close()
def is_admin(telegram_id):
"""Verifica si un usuario es administrador."""
return get_user_role(telegram_id) == 'admin'
def is_crew(telegram_id):
"""Verifica si un usuario es del equipo (crew) o administrador."""
return get_user_role(telegram_id) in ['admin', 'crew']

56
bot/modules/llm_engine.py Normal file
View File

@@ -0,0 +1,56 @@
# bot/modules/llm_engine.py
# Este script se encarga de la comunicación con la inteligencia artificial de OpenAI.
import openai
from bot.config import OPENAI_API_KEY, OPENAI_MODEL
def get_smart_response(prompt):
"""
Genera una respuesta inteligente usando la API de OpenAI.
Parámetros:
- prompt: El texto o pregunta que le enviamos a la IA.
"""
# Verificamos que tengamos la llave de la API configurada
if not OPENAI_API_KEY:
return "Error: La llave de la API de OpenAI no está configurada."
try:
# Creamos el cliente de OpenAI
client = openai.OpenAI(api_key=OPENAI_API_KEY)
# Solicitamos una respuesta al modelo configurado
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": "Eres un asistente útil."},
{"role": "user", "content": prompt},
],
)
# Devolvemos el contenido de la respuesta limpia (sin espacios extras)
return response.choices[0].message.content.strip()
except Exception as e:
# Si algo sale mal, devolvemos el error
return f"Ocurrió un error al comunicarse con OpenAI: {e}"
def transcribe_audio(audio_file_path):
"""
Transcribes an audio file using OpenAI's Whisper model.
Parameters:
- audio_file_path: The path to the audio file.
"""
if not OPENAI_API_KEY:
return "Error: OPENAI_API_KEY is not configured."
try:
client = openai.OpenAI(api_key=OPENAI_API_KEY)
with open(audio_file_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
return transcript.text
except Exception as e:
return f"Error during audio transcription: {e}"

25
bot/modules/nfc_tag.py Normal file
View File

@@ -0,0 +1,25 @@
# bot/modules/nfc_tag.py
# This module contains the logic for generating NFC tags.
import base64
import json
import logging
logger = logging.getLogger(__name__)
def generate_nfc_tag(collected_data):
"""
Generates a Base64 encoded string from the collected data.
"""
tag_data = {
"name": collected_data.get("EMPLOYEE_NAME"),
"num_emp": collected_data.get("EMPLOYEE_ID"),
"sucursal": collected_data.get("BRANCH"),
"telegram_id": collected_data.get("TELEGRAM_ID"),
}
json_string = json.dumps(tag_data)
base64_bytes = base64.b64encode(json_string.encode("utf-8"))
base64_string = base64_bytes.decode("utf-8")
return f"¡Gracias! Aquí está tu tag en formato Base64:\n\n`{base64_string}`"

65
bot/modules/onboarding.py Normal file
View File

@@ -0,0 +1,65 @@
# bot/modules/onboarding.py
# Este módulo maneja la primera interacción con el usuario (el comando /start).
# Se encarga de mostrar un menú diferente según quién sea el usuario (admin, crew o cliente).
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
def get_admin_menu(flow_engine):
"""Crea el menú de botones principal para los Administradores."""
keyboard = [
[InlineKeyboardButton("👑 Revisar Pendientes", callback_data='view_pending')],
[InlineKeyboardButton("📅 Agenda", callback_data='view_agenda')],
]
# Dynamic buttons from flows
if flow_engine:
for flow in flow_engine.flows:
if flow.get("role") == "admin" and "trigger_button" in flow and "name" in flow:
button = InlineKeyboardButton(flow["name"], callback_data=flow["trigger_button"])
keyboard.append([button])
keyboard.append([InlineKeyboardButton("▶️ Más opciones", callback_data='admin_menu')])
return InlineKeyboardMarkup(keyboard)
def get_admin_secondary_menu():
"""Crea el menú secundario para Administradores."""
text = "Aquí tienes más opciones de administración:"
keyboard = [
[InlineKeyboardButton("📋 Gestionar Tareas (Vikunja)", callback_data='manage_vikunja')],
[InlineKeyboardButton("📊 Estado del sistema", callback_data='view_system_status')],
[InlineKeyboardButton("👥 Gestionar Usuarios", callback_data='manage_users')],
]
reply_markup = InlineKeyboardMarkup(keyboard)
return text, reply_markup
def get_crew_menu():
"""Crea el menú de botones para los Miembros del Equipo."""
keyboard = [
[InlineKeyboardButton("🕒 Proponer actividad", callback_data='propose_activity')],
[InlineKeyboardButton("📄 Ver estatus de solicitudes", callback_data='view_requests_status')],
]
return InlineKeyboardMarkup(keyboard)
def get_client_menu():
"""Crea el menú de botones para los Clientes externos."""
keyboard = [
[InlineKeyboardButton("🗓️ Agendar una cita", callback_data='schedule_appointment')],
[InlineKeyboardButton(" Información de servicios", callback_data='get_service_info')],
]
return InlineKeyboardMarkup(keyboard)
def handle_start(user_role, flow_engine=None):
"""
Decide qué mensaje y qué menú mostrar según el rol del usuario.
"""
welcome_message = "Hola, soy Talía. ¿En qué puedo ayudarte hoy?"
if user_role == "admin":
menu = get_admin_menu(flow_engine)
elif user_role == "crew":
menu = get_crew_menu()
else:
menu = get_client_menu()
return welcome_message, menu

121
bot/modules/printer.py Normal file
View File

@@ -0,0 +1,121 @@
# bot/modules/printer.py
# This module will contain the SMTP/IMAP loop for the remote printing service.
import smtplib
import imaplib
import email
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from bot.config import (
SMTP_SERVER,
SMTP_PORT,
SMTP_USER,
SMTP_PASS,
IMAP_SERVER,
IMAP_USER,
IMAP_PASS,
PRINTER_EMAIL,
)
from bot.modules.identity import is_admin
logger = logging.getLogger(__name__)
async def send_file_to_printer(file_path: str, user_id: int, file_name: str):
"""
Sends a file to the printer via email.
"""
if not is_admin(user_id):
return "No tienes permiso para usar este comando."
if not all([SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASS, PRINTER_EMAIL]):
logger.error("Faltan una o más variables de entorno SMTP o PRINTER_EMAIL.")
return "El servicio de impresión no está configurado correctamente."
try:
msg = MIMEMultipart()
msg["From"] = SMTP_USER
msg["To"] = PRINTER_EMAIL
msg["Subject"] = f"Print Job from {user_id}: {file_name}"
body = f"Nuevo trabajo de impresión enviado por el usuario {user_id}.\nNombre del archivo: {file_name}"
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename= {file_name}",
)
msg.attach(part)
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT)
server.login(SMTP_USER, SMTP_PASS)
text = msg.as_string()
server.sendmail(SMTP_USER, PRINTER_EMAIL, text)
server.quit()
logger.info(f"Archivo {file_name} enviado a la impresora ({PRINTER_EMAIL}) por el usuario {user_id}.")
return f"Tu archivo '{file_name}' ha sido enviado a la impresora. Recibirás una notificación cuando el estado del trabajo cambie."
except Exception as e:
logger.error(f"Error al enviar el correo de impresión: {e}")
return "Ocurrió un error al enviar el archivo a la impresora. Por favor, inténtalo de nuevo más tarde."
async def check_print_status(user_id: int):
"""
Checks the status of print jobs by reading the inbox.
"""
if not is_admin(user_id):
return "No tienes permiso para usar este comando."
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
logger.error("Faltan una o más variables de entorno IMAP.")
return "El servicio de monitoreo de impresión no está configurado correctamente."
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(IMAP_USER, IMAP_PASS)
mail.select("inbox")
status, messages = mail.search(None, "UNSEEN")
if status != "OK":
return "No se pudieron buscar los correos."
email_ids = messages[0].split()
if not email_ids:
return "No hay actualizaciones de estado de impresión."
statuses = []
for e_id in email_ids:
_, msg_data = mail.fetch(e_id, "(RFC822)")
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
subject = msg["subject"].lower()
if "completed" in subject:
statuses.append(f"Trabajo de impresión completado: {msg['subject']}")
elif "failed" in subject:
statuses.append(f"Trabajo de impresión fallido: {msg['subject']}")
elif "received" in subject:
statuses.append(f"Trabajo de impresión recibido: {msg['subject']}")
else:
statuses.append(f"Nuevo correo: {msg['subject']}")
mail.logout()
if not statuses:
return "No se encontraron actualizaciones de estado relevantes."
return "\n".join(statuses)
except Exception as e:
logger.error(f"Error al revisar el estado de la impresión: {e}")
return "Ocurrió un error al revisar el estado de la impresión."

73
bot/modules/sales_rag.py Normal file
View File

@@ -0,0 +1,73 @@
# bot/modules/sales_rag.py
# This module will contain the sales RAG flow for new clients.
import json
import logging
from bot.modules.llm_engine import get_smart_response
logger = logging.getLogger(__name__)
def load_services_data():
"""Loads the services data from the JSON file."""
try:
with open("bot/data/services.json", "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
logger.error("El archivo services.json no fue encontrado.")
return []
except json.JSONDecodeError:
logger.error("Error al decodificar el archivo services.json.")
return []
def find_relevant_services(user_query, services):
"""
Finds relevant services based on the user's query.
A simple keyword matching approach is used here.
"""
query = user_query.lower()
relevant_services = []
for service in services:
for keyword in service.get("keywords", []):
if keyword in query:
relevant_services.append(service)
break # Avoid adding the same service multiple times
return relevant_services
def generate_sales_pitch(user_query, collected_data):
"""
Generates a personalized sales pitch using the RAG approach.
"""
services = load_services_data()
relevant_services = find_relevant_services(user_query, services)
if not relevant_services:
logger.warning(f"No se encontraron servicios relevantes para la consulta: '{user_query}'. No se generará respuesta.")
return ("Gracias por tu interés. Sin embargo, con la información proporcionada no he podido identificar "
"servicios específicos que se ajusten a tu necesidad. ¿Podrías describir tu proyecto con otras palabras "
"o dar más detalles sobre lo que buscas?")
context_str = "Según tus necesidades, aquí tienes algunos de nuestros servicios y ejemplos de lo que podemos hacer:\n"
for service in relevant_services:
context_str += f"\n**Servicio:** {service['service_name']}\n"
context_str += f"*Descripción:* {service['description']}\n"
if "work_examples" in service:
context_str += "*Ejemplos de trabajo:*\n"
for example in service["work_examples"]:
context_str += f" - {example}\n"
prompt = (
f"Eres Talía, una asistente de ventas experta y amigable. Un cliente potencial llamado "
f"{collected_data.get('CLIENT_NAME', 'cliente')} del sector "
f"'{collected_data.get('CLIENT_INDUSTRY', 'no especificado')}' "
f"ha descrito su proyecto o necesidad de la siguiente manera: '{user_query}'.\n\n"
"A continuación, se presenta información sobre nuestros servicios que podría ser relevante para ellos:\n"
f"{context_str}\n\n"
"**Tu tarea es generar una respuesta personalizada que:**\n"
"1. Demuestre que has comprendido su necesidad específica.\n"
"2. Conecte de manera clara y directa su proyecto con nuestros servicios, utilizando los ejemplos de trabajo para ilustrar cómo podemos ayudar.\n"
"3. Mantenga un tono profesional, pero cercano y proactivo.\n"
"4. Finalice con una llamada a la acción clara, sugiriendo agendar una breve llamada para explorar la idea más a fondo.\n"
"No te limites a listar los servicios; explica *cómo* se aplican a su caso."
)
return get_smart_response(prompt)

206
bot/modules/vikunja.py Normal file
View File

@@ -0,0 +1,206 @@
# app/modules/vikunja.py
# Este módulo maneja la integración con Vikunja para la gestión de tareas.
import requests
import logging
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import (
ConversationHandler,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
filters,
ContextTypes,
)
from bot.config import VIKUNJA_API_URL, VIKUNJA_API_TOKEN
from bot.modules.identity import is_admin
# Configuración del logger
logger = logging.getLogger(__name__)
# Definición de los estados de la conversación para añadir y editar tareas
SELECTING_ACTION, ADDING_TASK, SELECTING_TASK_TO_EDIT, EDITING_TASK = range(4)
def get_vikunja_headers():
"""Devuelve los headers necesarios para la API de Vikunja."""
return {
"Authorization": f"Bearer {VIKUNJA_API_TOKEN}",
"Content-Type": "application/json",
}
def get_projects_list():
"""Returns a list of projects from Vikunja."""
if not VIKUNJA_API_TOKEN:
return []
try:
response = requests.get(f"{VIKUNJA_API_URL}/projects", headers=get_vikunja_headers())
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error fetching projects: {e}")
return []
def get_tasks_list(project_id=1):
"""Returns a list of tasks for a project."""
if not VIKUNJA_API_TOKEN:
return []
try:
response = requests.get(f"{VIKUNJA_API_URL}/projects/{project_id}/tasks", headers=get_vikunja_headers())
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error fetching tasks: {e}")
return []
def get_tasks():
"""
Obtiene y formatea la lista de tareas de Vikunja.
Esta función es síncrona y devuelve un string.
"""
if not VIKUNJA_API_TOKEN:
return "Error: VIKUNJA_API_TOKEN no configurado."
try:
response = requests.get(f"{VIKUNJA_API_URL}/projects/1/tasks", headers=get_vikunja_headers())
response.raise_for_status()
tasks = response.json()
if not tasks:
return "No tienes tareas pendientes en Vikunja."
text = "📋 *Tus Tareas en Vikunja*\n\n"
for task in sorted(tasks, key=lambda t: t.get('id', 0))[:10]:
status = "" if task.get('done') else ""
text += f"{status} `{task.get('id')}`: *{task.get('title')}*\n"
return text
except Exception as e:
logger.error(f"Error al obtener tareas de Vikunja: {e}")
return f"Error al conectar con Vikunja: {e}"
async def vikunja_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Muestra el menú principal de acciones de Vikunja."""
query = update.callback_query
await query.answer()
keyboard = [
[InlineKeyboardButton("Añadir Tarea", callback_data='add_task')],
[InlineKeyboardButton("Editar Tarea", callback_data='edit_task_start')],
[InlineKeyboardButton("Volver", callback_data='cancel')],
]
reply_markup = InlineKeyboardMarkup(keyboard)
tasks_list = get_tasks()
await query.edit_message_text(text=f"{tasks_list}\n\nSelecciona una acción:", reply_markup=reply_markup, parse_mode='Markdown')
return SELECTING_ACTION
async def request_task_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Solicita al usuario el título de la nueva tarea."""
query = update.callback_query
await query.answer()
await query.edit_message_text("Por favor, introduce el título de la nueva tarea:")
return ADDING_TASK
async def add_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Añade una nueva tarea a Vikunja."""
task_title = update.message.text
try:
data = {"title": task_title, "project_id": 1}
response = requests.post(f"{VIKUNJA_API_URL}/tasks", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
await update.message.reply_text(f"✅ Tarea añadida: *{task_title}*", parse_mode='Markdown')
except Exception as e:
logger.error(f"Error al añadir tarea a Vikunja: {e}")
await update.message.reply_text(f"Error al añadir tarea: {e}")
return ConversationHandler.END
async def select_task_to_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Muestra los botones para seleccionar qué tarea editar."""
query = update.callback_query
await query.answer()
try:
response = requests.get(f"{VIKUNJA_API_URL}/projects/1/tasks", headers=get_vikunja_headers())
response.raise_for_status()
tasks = [task for task in response.json() if not task.get('done')]
if not tasks:
await query.edit_message_text("No hay tareas pendientes para editar.")
return ConversationHandler.END
keyboard = []
for task in sorted(tasks, key=lambda t: t.get('id', 0))[:10]:
keyboard.append([InlineKeyboardButton(
f"{task.get('id')}: {task.get('title')}",
callback_data=f"edit_task:{task.get('id')}"
)])
keyboard.append([InlineKeyboardButton("Cancelar", callback_data='cancel')])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Selecciona la tarea que quieres editar:", reply_markup=reply_markup)
return SELECTING_TASK_TO_EDIT
except Exception as e:
logger.error(f"Error al obtener tareas para editar: {e}")
await query.edit_message_text("Error al obtener la lista de tareas.")
return ConversationHandler.END
async def request_new_task_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Solicita el nuevo título para la tarea seleccionada."""
query = update.callback_query
await query.answer()
task_id = query.data.split(':')[1]
context.user_data['task_id_to_edit'] = task_id
await query.edit_message_text(f"Introduce el nuevo título para la tarea `{task_id}`:", parse_mode='Markdown')
return EDITING_TASK
async def edit_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Actualiza el título de una tarea en Vikunja."""
new_title = update.message.text
task_id = context.user_data.get('task_id_to_edit')
if not task_id:
await update.message.reply_text("Error: No se encontró el ID de la tarea a editar.")
return ConversationHandler.END
try:
data = {"title": new_title}
response = requests.put(f"{VIKUNJA_API_URL}/tasks/{task_id}", headers=get_vikunja_headers(), json=data)
response.raise_for_status()
await update.message.reply_text(f"✅ Tarea `{task_id}` actualizada a *{new_title}*", parse_mode='Markdown')
except Exception as e:
logger.error(f"Error al editar la tarea {task_id}: {e}")
await update.message.reply_text("Error al actualizar la tarea.")
finally:
del context.user_data['task_id_to_edit']
return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Cancela la conversación actual."""
query = update.callback_query
await query.answer()
await query.edit_message_text("Operación cancelada.")
return ConversationHandler.END
def vikunja_conv_handler():
"""Crea el ConversationHandler para el flujo de Vikunja."""
return ConversationHandler(
entry_points=[CallbackQueryHandler(vikunja_menu, pattern='^manage_vikunja$')],
states={
SELECTING_ACTION: [
CallbackQueryHandler(request_task_title, pattern='^add_task$'),
CallbackQueryHandler(select_task_to_edit, pattern='^edit_task_start$'),
CallbackQueryHandler(cancel, pattern='^cancel$'),
],
ADDING_TASK: [MessageHandler(filters.TEXT & ~filters.COMMAND, add_task)],
SELECTING_TASK_TO_EDIT: [
CallbackQueryHandler(request_new_task_title, pattern=r'^edit_task:\d+$'),
CallbackQueryHandler(cancel, pattern='^cancel$'),
],
EDITING_TASK: [MessageHandler(filters.TEXT & ~filters.COMMAND, edit_task)],
},
fallbacks=[CommandHandler('cancel', cancel)],
)