mirror of
https://github.com/marcogll/talia_bot.git
synced 2026-01-13 13:25:19 +00:00
- Upgrade Python to 3.11 and update dependencies. - Refactor main.py to isolate business logic. - Fix bugs in flow_engine.py and printer.py. - Improve database connection handling. - Standardize error handling. - Verify secret management.
158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
# bot/modules/printer.py
|
|
# This module will contain the SMTP/IMAP loop for the remote printing service.
|
|
|
|
import smtplib
|
|
import imaplib
|
|
import email
|
|
import logging
|
|
import os
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from telegram import Update
|
|
from telegram.ext import ContextTypes
|
|
|
|
from bot.config import (
|
|
SMTP_SERVER,
|
|
SMTP_PORT,
|
|
SMTP_USER,
|
|
SMTP_PASS,
|
|
IMAP_SERVER,
|
|
IMAP_USER,
|
|
IMAP_PASS,
|
|
PRINTER_EMAIL,
|
|
)
|
|
from bot.modules.identity import is_admin
|
|
from bot.modules.file_validation import validate_document
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handles documents sent to the bot for printing."""
|
|
document = update.message.document
|
|
user_id = update.effective_user.id
|
|
|
|
is_valid, message = validate_document(document)
|
|
if not is_valid:
|
|
await update.message.reply_text(message)
|
|
return
|
|
|
|
file = await context.bot.get_file(document.file_id)
|
|
|
|
temp_dir = 'temp_files'
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
file_path = os.path.join(temp_dir, document.file_name)
|
|
|
|
try:
|
|
await file.download_to_drive(file_path)
|
|
|
|
response = await send_file_to_printer(file_path, user_id, document.file_name)
|
|
await update.message.reply_text(response)
|
|
finally:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
|
|
async def send_file_to_printer(file_path: str, user_id: int, file_name: str):
|
|
"""
|
|
Sends a file to the printer via email.
|
|
"""
|
|
if not is_admin(user_id):
|
|
return "No tienes permiso para usar este comando."
|
|
|
|
if not all([SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASS, PRINTER_EMAIL]):
|
|
logger.error("Faltan una o más variables de entorno SMTP o PRINTER_EMAIL.")
|
|
return "El servicio de impresión no está configurado correctamente."
|
|
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg["From"] = SMTP_USER
|
|
msg["To"] = PRINTER_EMAIL
|
|
msg["Subject"] = f"Print Job from {user_id}: {file_name}"
|
|
|
|
body = f"Nuevo trabajo de impresión enviado por el usuario {user_id}.\nNombre del archivo: {file_name}"
|
|
msg.attach(MIMEText(body, "plain"))
|
|
|
|
with open(file_path, "rb") as attachment:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(attachment.read())
|
|
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
f"attachment; filename= {file_name}",
|
|
)
|
|
msg.attach(part)
|
|
|
|
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT)
|
|
server.login(SMTP_USER, SMTP_PASS)
|
|
text = msg.as_string()
|
|
server.sendmail(SMTP_USER, PRINTER_EMAIL, text)
|
|
server.quit()
|
|
|
|
logger.info(f"Archivo {file_name} enviado a la impresora ({PRINTER_EMAIL}) por el usuario {user_id}.")
|
|
return f"Tu archivo '{file_name}' ha sido enviado a la impresora. Recibirás una notificación cuando el estado del trabajo cambie."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al enviar el correo de impresión: {e}")
|
|
return "Ocurrió un error al enviar el archivo a la impresora. Por favor, inténtalo de nuevo más tarde."
|
|
|
|
|
|
async def check_print_status(user_id: int):
|
|
"""
|
|
Checks the status of print jobs by reading the inbox.
|
|
"""
|
|
if not is_admin(user_id):
|
|
return "No tienes permiso para usar este comando."
|
|
|
|
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
|
|
logger.error("Faltan una o más variables de entorno IMAP.")
|
|
return "El servicio de monitoreo de impresión no está configurado correctamente."
|
|
|
|
try:
|
|
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
|
|
mail.login(IMAP_USER, IMAP_PASS)
|
|
mail.select("inbox")
|
|
|
|
status, messages = mail.search(None, "UNSEEN")
|
|
if status != "OK":
|
|
return "No se pudieron buscar los correos."
|
|
|
|
email_ids = messages[0].split()
|
|
if not email_ids:
|
|
return "No hay actualizaciones de estado de impresión."
|
|
|
|
# Fetch only the last 10 unseen emails
|
|
email_ids = email_ids[-10:]
|
|
|
|
statuses = []
|
|
for e_id in email_ids:
|
|
_, msg_data = mail.fetch(e_id, "(RFC822)")
|
|
for response_part in msg_data:
|
|
if isinstance(response_part, tuple):
|
|
msg = email.message_from_bytes(response_part[1])
|
|
subject = msg["subject"].lower()
|
|
if "completed" in subject:
|
|
statuses.append(f"Trabajo de impresión completado: {msg['subject']}")
|
|
elif "failed" in subject:
|
|
statuses.append(f"Trabajo de impresión fallido: {msg['subject']}")
|
|
elif "received" in subject:
|
|
statuses.append(f"Trabajo de impresión recibido: {msg['subject']}")
|
|
else:
|
|
statuses.append(f"Nuevo correo: {msg['subject']}")
|
|
# Mark the email as seen
|
|
mail.store(e_id, "+FLAGS", "\\Seen")
|
|
|
|
mail.close()
|
|
mail.logout()
|
|
|
|
if not statuses:
|
|
return "No se encontraron actualizaciones de estado relevantes."
|
|
|
|
return "\n".join(statuses)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al revisar el estado de la impresión: {e}")
|
|
return "Ocurrió un error al revisar el estado de la impresión."
|