feat: Implement core application structure, AI extraction, persistence, and Telegram bot modules with updated configuration and dependencies.

This commit is contained in:
Marco Gallegos
2025-12-18 12:15:04 -06:00
parent 7276e480b0
commit 899482580e
45 changed files with 1157 additions and 225 deletions

View File

@@ -1,12 +1,16 @@
# Telegram
TELEGRAM_BOT_TOKEN=
# Telegram Bot Token from BotFather
TELEGRAM_TOKEN=""
# OpenAI
OPENAI_API_KEY=
# Your OpenAI API Key
OPENAI_API_KEY=""
# Google Cloud
GOOGLE_APPLICATION_CREDENTIALS=
SPREADSHEET_ID=
# The ID of the Telegram supergroup where the bot will operate
SUPERGROUP_ID=""
# Environment
ENV=dev
# Database connection string
# For SQLite: DATABASE_URL="sqlite:///database.db"
# For PostgreSQL: DATABASE_URL="postgresql://user:password@host:port/dbname"
DATABASE_URL="sqlite:///database.db"
# Log level (e.g., DEBUG, INFO, WARNING, ERROR)
LOG_LEVEL="INFO"

23
Dockerfile Normal file
View File

@@ -0,0 +1,23 @@
# Use an official Python runtime as a parent image
FROM python:3.9-slim
# Set the working directory in the container
WORKDIR /app
# Copy the requirements file into the container at /app
COPY requirements.txt .
# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the application code into the container at /app
COPY ./app /app/app
# Make port 80 available to the world outside this container
EXPOSE 80
# Define environment variable
ENV PYTHONPATH /app
# Run the application when the container launches
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]

271
README.md
View File

@@ -1,232 +1,79 @@
# Sistema de Gestión de Gastos Personalizable con Python y Telegram
# Telegram Expenses Bot
Este proyecto implementa un **sistema de gestión de gastos modular, auditable y altamente personalizable**, construido en Python y operado a través de un bot de Telegram. El objetivo no es solo registrar gastos, sino **entenderlos, clasificarlos y validarlos** con un equilibrio deliberado entre reglas explícitas (configuración humana) e inferencia asistida por IA.
A bot to track expenses via Telegram messages, using AI for data extraction.
El sistema permite registrar gastos usando **texto, notas de voz, imágenes (tickets) y documentos PDF (facturas)**. La IA se utiliza únicamente para interpretar datos no estructurados (OCR, transcripción, extracción semántica), mientras que **la lógica de negocio vive fuera del código**, en archivos CSV y JSON que el usuario controla.
## Project Structure
La filosofía central es simple pero potente:
This project follows a modular, service-oriented architecture.
> *La IA sugiere, las reglas deciden, el usuario confirma.*
- **/app**: Main application source code.
- **/ai**: AI models, prompts, and logic.
- **/audit**: Logging and raw data storage for traceability.
- **/ingestion**: Handlers for different input types (text, image, audio).
- **/integrations**: Connections to external services.
- **/modules**: Telegram command handlers.
- **/persistence**: Database models and data access layer.
- **/preprocessing**: Data cleaning and normalization.
- **/schema**: Pydantic data models.
- **main.py**: FastAPI application entry point.
- **router.py**: Main workflow orchestrator.
- **config.py**: Configuration loader.
- **/raw_storage**: (Created automatically) Stores original uploaded files.
- **Dockerfile**: Defines the container for the application.
- **docker-compose.yml**: Orchestrates the application and database services.
- **requirements.txt**: Python dependencies.
- **.env.example**: Example environment variables.
---
## How to Run
## Objetivos del Sistema
1. **Set up environment variables:**
```bash
cp .env.example .env
```
Fill in the values in the `.env` file (Telegram token, OpenAI key, etc.).
* Eliminar fricción en el registro diario de gastos.
* Evitar dependencias rígidas de lógica hardcodeada.
* Mantener trazabilidad y control fiscal (especialmente en México).
* Permitir adaptación rápida a cualquier negocio o uso personal.
* Diseñar una base sólida para automatización contable posterior.
2. **Build and run with Docker Compose:**
```bash
docker-compose up --build
```
---
3. **Access the API:**
The API will be available at `http://localhost:8000`. The interactive documentation can be found at `http://localhost:8000/docs`.
## Core Features
## Running the Telegram Bot
### Personalización Total (Config-Driven)
This setup provides the backend API. To connect it to Telegram, you have two main options:
La clasificación de gastos **no está en el código**. Proveedores, categorías, subcategorías, palabras clave y reglas fiscales se gestionan mediante archivos CSV y JSON editables.
1. **Webhook**: Set a webhook with Telegram to point to your deployed API's `/webhook/telegram` endpoint. This is the recommended production approach.
2. **Polling**: Modify the application to use polling instead of a webhook. This involves creating a separate script or modifying `main.py` to start the `python-telegram-bot` `Application` and add the handlers from the `modules` directory. This is simpler for local development.
Esto permite:
### Example: Adding Polling for Development
* Ajustes sin despliegues.
* Uso por personas no técnicas.
* Reglas distintas por usuario o empresa.
You could add this to a new file, `run_bot.py`, in the root directory:
### Entrada Multimodal
```python
import asyncio
from telegram.ext import Application, CommandHandler, MessageHandler, filters
from app.config import config
from app.modules import start, upload, status, search, admin
El bot acepta gastos mediante:
def main() -> None:
"""Start the bot."""
application = Application.builder().token(config.TELEGRAM_TOKEN).build()
* Texto libre.
* Notas de voz (transcripción automática).
* Fotos de tickets (OCR).
* Facturas PDF (extracción estructurada).
# Add command handlers
application.add_handler(CommandHandler("start", start.start))
application.add_handler(CommandHandler("status", status.status))
application.add_handler(CommandHandler("search", search.search))
application.add_handler(CommandHandler("admin", admin.admin_command))
Todo converge en un modelo de gasto unificado.
# Add message handler
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, upload.handle_message))
### Procesamiento Inteligente con IA (Controlada)
# Run the bot
application.run_polling()
La IA se utiliza para:
* Leer y transcribir información no estructurada.
* Extraer proveedor, fecha, monto y conceptos.
* Inferir contexto **solo cuando las reglas no aplican**.
La decisión final prioriza reglas explícitas antes que inferencia probabilística.
### Validación Fiscal (CFDI)
Si el sistema detecta una factura:
* Verifica que el RFC receptor coincida con el configurado.
* Considera el régimen fiscal del usuario.
* Marca inconsistencias como observaciones (no bloquea, pero alerta).
### Flujo de Confirmación y Auditoría
Ningún gasto entra al registro final sin pasar por confirmación.
El sistema:
* Presenta un resumen al usuario.
* Permite correcciones naturales por chat.
* Registra **qué cambió, quién lo cambió y por qué**.
Esto garantiza integridad y auditabilidad.
---
## Arquitectura de Datos: El Cerebro Configurable
El núcleo del sistema es la carpeta `config/`. Aquí se define **cómo piensa el bot**.
### 1. Configuración del Usuario
**Archivo:** `config/user_config.json`
Define la identidad fiscal y preferencias base del usuario.
```json
{
"user_name": "Marco Gallegos",
"rfc": "GAMM910513CW6",
"regimen_fiscal_default": "612 - Persona Física con Actividad Empresarial y Profesional",
"moneda_default": "MXN",
"pais": "MX",
"timezone": "America/Mexico_City"
}
if __name__ == "__main__":
main()
```
Este archivo es crítico para validación CFDI y normalización de datos.
---
### 2. Base de Proveedores
**Archivo:** `config/providers.csv`
Es la regla de clasificación **más fuerte del sistema**.
```csv
provider_name,aliases,categoria_principal,subcategoria,tipo_gasto_default
Amazon,"amazon,amzn,amazon mx",Por Determinar,Compras en Línea,
Office Depot,"officedepot,office",Administración,Suministros de oficina,negocio
Uber Eats,"ubereats,uber",Personal,Comida a domicilio,personal
GoDaddy,"godaddy",Tecnología,Dominios y Hosting,negocio
Cinepolis,"cinepolis",Personal,Entretenimiento,personal
```
Si un proveedor coincide aquí, **no se consulta a la IA**.
---
### 3. Palabras Clave de Artículos
**Archivo:** `config/keywords.csv`
Se usa principalmente para proveedores genéricos.
```csv
keyword,categoria_principal,subcategoria,tipo_gasto_default
monitor,Tecnología,Equipo de Cómputo,negocio
croquetas,Personal,Mascotas,personal
hosting,Tecnología,Dominios y Hosting,negocio
libro,Educación,Libros y Material,negocio
```
Permite clasificación por contenido del ticket, no solo por tienda.
---
## Agentes de IA: Roles Claros, Responsabilidades Limitadas
El sistema usa dos agentes conceptuales, cada uno con límites estrictos.
### 1. The Analyst (Procesamiento Inicial)
Responsable de convertir una entrada cruda en un gasto estructurado.
Flujo lógico:
1. **Extracción de datos** (OCR, transcripción, parsing).
2. **Matching contra providers.csv** (prioridad máxima).
3. **Matching contra keywords.csv** si el proveedor es genérico.
4. **Inferencia con IA** solo si no hubo coincidencias.
5. **Validación fiscal básica** (RFC y régimen).
6. **Cálculo de confianza** (reglas > IA).
El resultado es un gasto provisional, nunca definitivo.
---
### 2. The Auditor (Confirmación y Correcciones)
Se activa tras la respuesta del usuario.
Funciones:
* Confirmar registros sin cambios.
* Aplicar correcciones explícitas.
* Registrar trazabilidad completa.
Ejemplo de auditoría:
```
AUDITORÍA: Usuario cambió monto de 150.00 a 180.00 (2025-01-14)
```
Nada se sobrescribe silenciosamente.
---
## Tecnologías Utilizadas
* **Lenguaje:** Python 3.10+
* **API Web:** FastAPI (webhook Telegram)
* **Bot:** python-telegram-bot
* **IA:** OpenAI API
* **OCR / Parsing:** vía IA
* **Almacenamiento:** Google Sheets (vía google-api-python-client)
* **Datos locales:** CSV / JSON
* **Validación:** Pydantic
---
## Estructura del Proyecto
```
/expense-tracker-python
│── .env
│── requirements.txt
│── /config
│ ├── user_config.json
│ ├── providers.csv
│ ├── keywords.csv
│ └── google_credentials.json
│── /src
│ ├── main.py # FastAPI + webhook Telegram
│ ├── data_models.py # Modelos Pydantic
│ │
│ ├── /modules
│ │ ├── ai_agents.py # Analyst & Auditor
│ │ ├── config_loader.py # Carga y validación de CSV/JSON
│ │ ├── input_handler.py # Texto, voz, imagen, PDF
│ │ └── data_manager.py # Google Sheets / storage
│ │
│ └── /prompts
│ ├── analyst_prompt.txt
│ └── auditor_prompt.txt
```
---
## Principios de Diseño
* Configuración > Código
* Reglas explícitas > Inferencia probabilística
* Confirmación humana obligatoria
* Auditoría antes que automatismo ciego
* IA como herramienta, no como autoridad
Este proyecto está diseñado para crecer hacia contabilidad automática, reportes fiscales y automatización financiera sin perder control humano.
You would then run `python run_bot.py` locally.

0
app/__init__.py Normal file
View File

0
app/ai/__init__.py Normal file
View File

42
app/ai/classifier.py Normal file
View File

@@ -0,0 +1,42 @@
"""
AI-powered classification and confidence scoring.
"""
import openai
import json
import logging
from typing import Dict, Any
from app.config import config
from app.ai.prompts import AUDITOR_PROMPT
from app.schema.base import ProvisionalExpense
# Configure the OpenAI client
openai.api_key = config.OPENAI_API_KEY
logger = logging.getLogger(__name__)
def classify_and_audit(expense: ProvisionalExpense) -> ProvisionalExpense:
"""
Uses an AI model to audit an extracted expense, providing a confidence
score and notes. This is a placeholder for a more complex classification
and validation logic.
Args:
expense: A ProvisionalExpense object with extracted data.
Returns:
The same ProvisionalExpense object, updated with the audit findings.
"""
logger.info(f"Starting AI audit for expense: {expense.extracted_data.description}")
# For now, this is a placeholder. A real implementation would
# call an AI model like in the extractor.
# For demonstration, we'll just assign a high confidence score.
expense.confidence_score = 0.95
expense.validation_notes.append("AI audit placeholder: auto-approved.")
expense.processing_method = "ai_inference" # Assume AI was used
logger.info("AI audit placeholder complete.")
return expense

16
app/ai/confidence.py Normal file
View File

@@ -0,0 +1,16 @@
"""
Functions for calculating confidence scores.
"""
def calculate_confidence(extracted_data: dict) -> float:
"""
Calculates a confidence score based on the quality of the extracted data.
Stub function.
"""
score = 1.0
# Lower score if key fields are missing
if not extracted_data.get("amount"):
score -= 0.5
if not extracted_data.get("description"):
score -= 0.3
return max(0.0, score)

60
app/ai/extractor.py Normal file
View File

@@ -0,0 +1,60 @@
"""
AI-powered data extraction from raw text.
"""
import openai
import json
import logging
from typing import Dict, Any
from app.config import config
from app.ai.prompts import EXTRACTOR_PROMPT
from app.schema.base import ExtractedExpense
# Configure the OpenAI client
openai.api_key = config.OPENAI_API_KEY
logger = logging.getLogger(__name__)
def extract_expense_data(text: str) -> ExtractedExpense:
"""
Uses an AI model to extract structured expense data from a raw text string.
Args:
text: The raw text from user input, OCR, or transcription.
Returns:
An ExtractedExpense object with the data found by the AI.
"""
logger.info(f"Starting AI extraction for text: '{text[:100]}...'")
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", # Or another suitable model
messages=[
{"role": "system", "content": EXTRACTOR_PROMPT},
{"role": "user", "content": text}
],
temperature=0.0,
response_format={"type": "json_object"}
)
# The response from OpenAI should be a JSON string in the message content
json_response = response.choices[0].message['content']
extracted_data = json.loads(json_response)
logger.info(f"AI extraction successful. Raw JSON: {extracted_data}")
# Add the original text to the model for audit purposes
extracted_data['raw_text'] = text
return ExtractedExpense(**extracted_data)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON from AI response: {e}")
# Return a model with only the raw text for manual review
return ExtractedExpense(raw_text=text)
except Exception as e:
logger.error(f"An unexpected error occurred during AI extraction: {e}")
# Return a model with only the raw text
return ExtractedExpense(raw_text=text)

56
app/ai/prompts.py Normal file
View File

@@ -0,0 +1,56 @@
"""
Version-controlled prompts for AI agents.
"""
# Prompt for the "Extractor" AI agent, which pulls structured data from raw text.
EXTRACTOR_PROMPT = """
You are a highly specialized AI assistant for expense tracking. Your task is to extract structured information from a given text. The text is a user's expense entry.
From the text, extract the following fields:
- "amount": The numeric value of the expense.
- "currency": The currency code (e.g., USD, EUR, CLP). If not specified, assume 'EUR'.
- "description": A brief description of what the expense was for.
- "date": The date of the expense in YYYY-MM-DD format. If not specified, use today's date.
- "category": The category of the expense (e.g., Food, Transport, Shopping, Rent, Utilities). If you cannot determine it, use 'Other'.
Respond ONLY with a valid JSON object containing these fields. Do not add any explanation or conversational text.
Example Text: "lunch with colleagues today, 25.50 eur"
Example JSON:
{
"amount": 25.50,
"currency": "EUR",
"description": "Lunch with colleagues",
"date": "2025-12-18",
"category": "Food"
}
"""
# Prompt for a "Classifier" or "Auditor" agent, which could validate the extraction.
# This is a placeholder for a potential future agent.
AUDITOR_PROMPT = """
You are an auditing AI. Your task is to review an expense record and determine its validity and compliance.
For the given JSON of an expense, check the following:
- Is the amount reasonable?
- Is the description clear?
- Is the category appropriate?
Based on your analysis, provide a "confidence_score" between 0.0 and 1.0 and a brief "audit_notes" string.
Respond ONLY with a valid JSON object.
Example Input JSON:
{
"amount": 25.50,
"currency": "EUR",
"description": "Lunch with colleagues",
"date": "2025-12-18",
"category": "Food"
}
Example Output JSON:
{
"confidence_score": 0.95,
"audit_notes": "The expense seems valid and well-categorized."
}
"""

0
app/audit/__init__.py Normal file
View File

28
app/audit/logs.py Normal file
View File

@@ -0,0 +1,28 @@
"""
Logging configuration and handlers.
"""
import logging
import sys
from app.config import config
def setup_logging():
"""
Sets up a centralized logging configuration for the application.
"""
log_level = config.LOG_LEVEL.upper()
# Remove any existing handlers
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stdout
)
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.info(f"Logging configured with level {log_level}")

38
app/audit/raw_storage.py Normal file
View File

@@ -0,0 +1,38 @@
"""
Handles storage of raw, original input files for audit purposes.
"""
import logging
import os
from uuid import uuid4
logger = logging.getLogger(__name__)
# A simple file-based storage. In production, you'd use S3 or a similar service.
RAW_STORAGE_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "raw_storage")
os.makedirs(RAW_STORAGE_PATH, exist_ok=True)
def save_raw_input(data: bytes, input_type: str) -> str:
"""
Saves the original input data to a file.
Args:
data: The raw bytes of the input.
input_type: The type of input (e.g., 'image', 'audio').
Returns:
The path to the saved file.
"""
try:
file_extension = input_type # e.g., 'jpg', 'mp3'
file_name = f"{uuid4()}.{file_extension}"
file_path = os.path.join(RAW_STORAGE_PATH, file_name)
with open(file_path, "wb") as f:
f.write(data)
logger.info(f"Saved raw input to {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to save raw input: {e}")
return ""

36
app/config.py Normal file
View File

@@ -0,0 +1,36 @@
"""
Configuration loader.
Loads environment variables from a .env file and makes them available as a Config object.
"""
import os
from dotenv import load_dotenv
# Load environment variables from .env file in the project root
# Note: The path is relative to the file's location in the final `app` directory
dotenv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
class Config:
"""
Holds the application's configuration.
"""
# Telegram Bot Token
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
# OpenAI API Key
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Supergroup ID for the bot
SUPERGROUP_ID = os.getenv("SUPERGROUP_ID")
# Database URL (e.g., "sqlite:///expenses.db")
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///../database.db")
# Log level
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# Create a single instance of the configuration
config = Config()

View File

29
app/ingestion/audio.py Normal file
View File

@@ -0,0 +1,29 @@
"""
Handles processing of audio inputs (e.g., voice memos).
"""
import logging
logger = logging.getLogger(__name__)
def process_audio_input(audio_data: bytes) -> str:
"""
Placeholder for audio input processing.
This will eventually involve Speech-to-Text (STT) transcription.
Args:
audio_data: The raw bytes of the audio file.
Returns:
The transcribed text, or an empty string if failed.
"""
logger.info("Processing audio input (stub).")
# In a real implementation, you would use a library like Whisper or a cloud service.
# For example:
# try:
# result = openai.Audio.transcribe("whisper-1", io.BytesIO(audio_data))
# return result['text']
# except Exception as e:
# logger.error(f"Audio transcription failed: {e}")
# return ""
return "Sample transcription from voice memo."

31
app/ingestion/document.py Normal file
View File

@@ -0,0 +1,31 @@
"""
Handles processing of document inputs (e.g., PDFs, Word docs).
"""
import logging
logger = logging.getLogger(__name__)
def process_document_input(doc_data: bytes) -> str:
"""
Placeholder for document input processing.
This will eventually involve text extraction from files like PDFs.
Args:
doc_data: The raw bytes of the document file.
Returns:
The extracted text, or an empty string if failed.
"""
logger.info("Processing document input (stub).")
# In a real implementation, you would use a library like PyMuPDF for PDFs.
# For example:
# try:
# import fitz # PyMuPDF
# with fitz.open(stream=doc_data, filetype="pdf") as doc:
# text = "".join(page.get_text() for page in doc)
# return text
# except Exception as e:
# logger.error(f"PDF processing failed: {e}")
# return ""
return "Sample text extracted from PDF document."

29
app/ingestion/image.py Normal file
View File

@@ -0,0 +1,29 @@
"""
Handles processing of image inputs (e.g., receipts).
"""
import logging
logger = logging.getLogger(__name__)
def process_image_input(image_data: bytes) -> str:
"""
Placeholder for image input processing.
This will eventually involve OCR (Optical Character Recognition).
Args:
image_data: The raw bytes of the image file.
Returns:
The extracted text from the image, or an empty string if failed.
"""
logger.info("Processing image input (stub).")
# In a real implementation, you would use a library like Tesseract or a cloud service.
# For example:
# try:
# text = pytesseract.image_to_string(Image.open(io.BytesIO(image_data)))
# return text
# except Exception as e:
# logger.error(f"OCR processing failed: {e}")
# return ""
return "Sample text extracted from receipt image."

24
app/ingestion/text.py Normal file
View File

@@ -0,0 +1,24 @@
"""
Handles processing of raw text inputs.
"""
import logging
logger = logging.getLogger(__name__)
def process_text_input(text: str) -> str:
"""
Takes raw text, normalizes it, and prepares it for AI extraction.
In the future, this could include more complex preprocessing like
language detection or PII removal.
Args:
text: The raw input text.
Returns:
The processed text.
"""
logger.info("Processing text input.")
# For now, normalization is simple. It will be moved to the preprocessing module.
normalized_text = text.lower().strip()
return normalized_text

View File

View File

@@ -0,0 +1,23 @@
"""
Functions for exporting data to other formats or systems (e.g., CSV, Google Sheets).
"""
import csv
import io
from typing import List
from app.schema.base import FinalExpense
def export_to_csv(expenses: List[FinalExpense]) -> str:
"""
Exports a list of expenses to a CSV formatted string.
"""
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(FinalExpense.__fields__.keys())
# Write data
for expense in expenses:
writer.writerow(expense.dict().values())
return output.getvalue()

View File

@@ -0,0 +1,21 @@
"""
Client for sending data to external webhook URLs.
"""
import httpx
import logging
logger = logging.getLogger(__name__)
async def send_to_webhook(url: str, data: dict):
"""
Sends a POST request with JSON data to a specified webhook URL.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
response.raise_for_status()
logger.info(f"Successfully sent data to webhook {url}")
return True
except httpx.RequestError as e:
logger.error(f"Failed to send data to webhook {url}: {e}")
return False

84
app/main.py Normal file
View File

@@ -0,0 +1,84 @@
"""
Application entry point.
Initializes the FastAPI application, sets up logging, database,
and defines the main API endpoints.
"""
import logging
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
# It's crucial to set up the config before other imports
from app.config import config
# Now, set up logging based on the config
logging.basicConfig(level=config.LOG_LEVEL.upper())
logger = logging.getLogger(__name__)
# Import other components
from app.schema.base import RawInput
from app.router import process_expense_input
from app.persistence import repositories, db
# Create database tables on startup
# This is simple, but for production, you'd use migrations (e.g., Alembic)
repositories.create_tables()
# Initialize the FastAPI app
app = FastAPI(
title="Telegram Expenses Bot API",
description="Processes and manages expense data from various sources.",
version="1.0.0"
)
@app.on_event("startup")
async def startup_event():
logger.info("Application startup complete.")
logger.info(f"Log level is set to: {config.LOG_LEVEL.upper()}")
@app.get("/", tags=["Status"])
async def root():
"""Health check endpoint."""
return {"message": "Telegram Expenses Bot API is running."}
@app.post("/webhook/telegram", tags=["Webhooks"])
async def process_telegram_update(request: dict):
"""
This endpoint would receive updates directly from a Telegram webhook.
It needs to be implemented to parse the Telegram Update object and
convert it into our internal RawInput model.
"""
logger.info(f"Received Telegram update: {request}")
# TODO: Implement a parser for the Telegram Update object.
# For now, this is a placeholder.
return {"status": "received", "message": "Telegram webhook handler not fully implemented."}
@app.post("/process-expense", tags=["Processing"])
async def process_expense(raw_input: RawInput, db_session: Session = Depends(db.get_db)):
"""
Receives raw expense data, processes it through the full pipeline,
and returns the result.
"""
logger.info(f"Received raw input for processing: {raw_input.dict()}")
try:
result = process_expense_input(db=db_session, raw_input=raw_input)
if result:
return {"status": "success", "expense_id": result.id}
else:
# This could happen if confidence is low or an error occurred
raise HTTPException(
status_code=400,
detail="Failed to process expense. It may require manual review or had invalid data."
)
except ValueError as e:
logger.error(f"Validation error: {e}")
raise HTTPException(status_code=422, detail=str(e))
except Exception as e:
logger.critical(f"An unexpected error occurred in the processing pipeline: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An internal server error occurred.")
# To run this app:
# uvicorn app.main:app --reload

0
app/modules/__init__.py Normal file
View File

10
app/modules/admin.py Normal file
View File

@@ -0,0 +1,10 @@
"""
Handlers for admin-only commands.
"""
from telegram import Update
from telegram.ext import ContextTypes
async def admin_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles an admin-specific command (stub)."""
# You would add a permission check here
await update.message.reply_text("Admin command is not yet implemented.")

9
app/modules/search.py Normal file
View File

@@ -0,0 +1,9 @@
"""
Handler for the /search command.
"""
from telegram import Update
from telegram.ext import ContextTypes
async def search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Searches the expense database (stub)."""
await update.message.reply_text("Search command is not yet implemented.")

14
app/modules/start.py Normal file
View File

@@ -0,0 +1,14 @@
"""
Handler for the /start command.
"""
from telegram import Update
from telegram.ext import ContextTypes
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Sends a welcome message when the /start command is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}! Welcome to the Expense Bot. "
"Send me a message with an expense (e.g., 'lunch 25 eur') "
"or forward a voice message or receipt image.",
)

9
app/modules/status.py Normal file
View File

@@ -0,0 +1,9 @@
"""
Handler for the /status command.
"""
from telegram import Update
from telegram.ext import ContextTypes
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Shows the status of the last processed expense (stub)."""
await update.message.reply_text("Status command is not yet implemented.")

48
app/modules/upload.py Normal file
View File

@@ -0,0 +1,48 @@
"""
Handler for receiving and processing user messages (text, audio, images).
"""
from telegram import Update
from telegram.ext import ContextTypes
import logging
from app.schema.base import RawInput
# This is a simplified integration. In a real app, you would likely
# have a queue or a more robust way to trigger the processing pipeline.
from app.router import process_expense_input
from app.persistence.db import get_db
logger = logging.getLogger(__name__)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles regular messages and triggers the expense processing pipeline.
"""
user_id = str(update.effective_user.id)
# This is a very simplified example.
# A real implementation needs to handle files, voice, etc.
if update.message.text:
raw_input = RawInput(
user_id=user_id,
type="text",
data=update.message.text
)
try:
# Get a DB session
db_session = next(get_db())
# Run the processing pipeline
result = process_expense_input(db=db_session, raw_input=raw_input)
if result:
await update.message.reply_text(f"Expense saved successfully! ID: {result.id}")
else:
await update.message.reply_text("I couldn't fully process that. It might need manual review.")
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
await update.message.reply_text("Sorry, an error occurred while processing your request.")
else:
await update.message.reply_text("I can currently only process text messages.")

20
app/permissions.py Normal file
View File

@@ -0,0 +1,20 @@
"""
Handles user permissions and access control.
Defines who is allowed to perform certain actions, such as uploading
or querying expense data.
"""
from app.config import config
def is_user_allowed(user_id: str) -> bool:
"""
Checks if a given user ID is allowed to use the bot.
For now, this is a stub. A real implementation could check against
a database of users or a predefined list in the config.
"""
# For example, you could have a comma-separated list of allowed IDs
# ALLOWED_USERS = config.ALLOWED_USER_IDS.split(',')
# return user_id in ALLOWED_USERS
return True # Allow all users for now

View File

46
app/persistence/db.py Normal file
View File

@@ -0,0 +1,46 @@
"""
Database connection and session management.
"""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import logging
from app.config import config
logger = logging.getLogger(__name__)
try:
# The 'check_same_thread' argument is specific to SQLite.
engine_args = {"check_same_thread": False} if config.DATABASE_URL.startswith("sqlite") else {}
engine = create_engine(
config.DATABASE_URL,
connect_args=engine_args
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
logger.info("Database engine created successfully.")
except Exception as e:
logger.critical(f"Failed to connect to the database: {e}")
# Exit or handle the critical error appropriately
engine = None
SessionLocal = None
Base = None
def get_db():
"""
Dependency for FastAPI routes to get a DB session.
"""
if SessionLocal is None:
raise Exception("Database is not configured. Cannot create session.")
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,66 @@
"""
Data access layer for persistence.
Contains functions to interact with the database.
"""
from sqlalchemy import Column, Integer, String, Float, Date, DateTime, Text
from sqlalchemy.orm import Session
import logging
from app.persistence.db import Base, engine
from app.schema.base import FinalExpense
logger = logging.getLogger(__name__)
# --- Database ORM Model ---
class ExpenseDB(Base):
__tablename__ = "expenses"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True, nullable=False)
provider_name = Column(String, nullable=False)
amount = Column(Float, nullable=False)
currency = Column(String(3), nullable=False)
expense_date = Column(Date, nullable=False)
description = Column(Text, nullable=True)
category = Column(String, nullable=False)
subcategory = Column(String, nullable=True)
expense_type = Column(String, nullable=False)
confirmed_at = Column(DateTime, nullable=False)
initial_processing_method = Column(String)
def create_tables():
"""
Creates all database tables defined by models inheriting from Base.
"""
if engine:
logger.info("Creating database tables if they don't exist...")
Base.metadata.create_all(bind=engine)
logger.info("Tables created successfully.")
else:
logger.error("Cannot create tables, database engine is not available.")
# --- Repository Functions ---
def save_final_expense(db: Session, expense: FinalExpense) -> ExpenseDB:
"""
Saves a user-confirmed expense to the database.
Args:
db: The database session.
expense: The FinalExpense object to save.
Returns:
The created ExpenseDB object.
"""
logger.info(f"Saving final expense for user {expense.user_id} to the database.")
db_expense = ExpenseDB(**expense.dict())
db.add(db_expense)
db.commit()
db.refresh(db_expense)
logger.info(f"Successfully saved expense with ID {db_expense.id}.")
return db_expense

View File

View File

@@ -0,0 +1,11 @@
"""
Language detection functions.
"""
def detect_language(text: str) -> str:
"""
Detects the language of a given text.
Stub function.
"""
# In a real app, use a library like 'langdetect' or 'google-cloud-translate'
return "en" # Assume English for now

View File

@@ -0,0 +1,11 @@
"""
Text normalization functions.
"""
def normalize_text(text: str) -> str:
"""
Normalizes a string by converting it to lowercase and stripping whitespace.
"""
if not text:
return ""
return text.lower().strip()

View File

@@ -0,0 +1,13 @@
"""
Data validation functions.
"""
def is_valid_expense(data: dict) -> bool:
"""
Validates if the extracted data for an expense is plausible.
Stub function.
"""
# Example validation: amount must be positive
if "amount" in data and data["amount"] <= 0:
return False
return True

85
app/router.py Normal file
View File

@@ -0,0 +1,85 @@
"""
Main application router.
Orchestrates the entire expense processing workflow, from input to persistence.
"""
import logging
from app.schema.base import RawInput, ProvisionalExpense, FinalExpense, ExpenseStatus
from app.ingestion import text, image, audio, document
from app.ai import extractor, classifier
from app.persistence import repositories
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def process_expense_input(db: Session, raw_input: RawInput) -> FinalExpense:
"""
Full pipeline for processing a raw input.
1. Ingestion: Convert input (text, image, etc.) to raw text.
2. AI Extraction: Parse the raw text into structured data.
3. AI Classification/Audit: Validate and categorize the expense.
4. Persistence: Save the final, confirmed expense to the database.
"""
logger.info(f"Router processing input for user {raw_input.user_id} of type {raw_input.input_type}")
# 1. Ingestion
raw_text = ""
if raw_input.input_type == "text":
raw_text = text.process_text_input(raw_input.data)
elif raw_input.input_type == "image":
# In a real app, data would be bytes, not a string path
raw_text = image.process_image_input(raw_input.data.encode())
elif raw_input.input_type == "audio":
raw_text = audio.process_audio_input(raw_input.data.encode())
elif raw_input.input_type == "document":
raw_text = document.process_document_input(raw_input.data.encode())
else:
raise ValueError(f"Unsupported input type: {raw_input.input_type}")
if not raw_text:
logger.error("Ingestion phase resulted in empty text. Aborting.")
# We might want to return a specific status here
return None
# 2. AI Extraction
extracted_data = extractor.extract_expense_data(raw_text)
if not extracted_data.amount or not extracted_data.description:
logger.error("AI extraction failed to find key details. Aborting.")
return None
# 3. AI Classification & Confirmation (simplified)
# In a real bot, you would present this to the user for confirmation.
provisional_expense = ProvisionalExpense(
user_id=raw_input.user_id,
extracted_data=extracted_data,
confidence_score=0.0 # Will be set by classifier
)
audited_expense = classifier.classify_and_audit(provisional_expense)
# For now, we auto-confirm if confidence is high.
if audited_expense.confidence_score > 0.7:
final_expense = FinalExpense(
user_id=audited_expense.user_id,
provider_name=audited_expense.extracted_data.description, # Simplified mapping
amount=audited_expense.extracted_data.amount,
currency=audited_expense.extracted_data.currency,
expense_date=audited_expense.extracted_data.expense_date,
description=audited_expense.extracted_data.description,
category=audited_expense.category,
expense_type="personal", # Default
initial_processing_method=audited_expense.processing_method,
confirmed_by="auto-confirm"
)
# 4. Persistence
db_record = repositories.save_final_expense(db, final_expense)
logger.info(f"Successfully processed and saved expense ID {db_record.id}")
return db_record
else:
logger.warning(f"Expense for user {raw_input.user_id} has low confidence. Awaiting manual confirmation.")
# Here you would store the provisional expense and notify the user
return None

26
app/scheduler.py Normal file
View File

@@ -0,0 +1,26 @@
"""
Handles background jobs, retries, and scheduled tasks.
For example, this could be used for:
- Retrying failed API calls.
- Sending daily or weekly expense summaries.
- Cleaning up old raw files.
"""
import logging
logger = logging.getLogger(__name__)
def schedule_daily_summary():
"""
Placeholder for a function that would be run on a schedule
by a library like APScheduler or Celery.
"""
logger.info("Scheduler: Running daily summary job (stub).")
# You would typically initialize and run a scheduler here, for example:
#
# from apscheduler.schedulers.background import BackgroundScheduler
#
# scheduler = BackgroundScheduler()
# scheduler.add_job(schedule_daily_summary, 'cron', hour=8)
# scheduler.start()

0
app/schema/__init__.py Normal file
View File

80
app/schema/base.py Normal file
View File

@@ -0,0 +1,80 @@
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime, date
from enum import Enum
class ExpenseStatus(str, Enum):
"""
Defines the explicit states an expense can be in throughout its lifecycle.
"""
RECEIVED = "RECEIVED"
ANALYZED = "ANALYZED"
AWAITING_CONFIRMATION = "AWAITING_CONFIRMATION"
CONFIRMED = "CONFIRMED"
CORRECTED = "CORRECTED"
STORED = "STORED"
class RawInput(BaseModel):
"""
Represents the raw data received from the input source (e.g., n8n).
"""
user_id: str
input_type: str = Field(..., alias="type", description="The type of input, e.g., 'text', 'voice', 'image', 'pdf'")
data: str
class ExtractedExpense(BaseModel):
"""
Represents an expense after initial data extraction (e.g., from OCR or transcription).
Fields are mostly optional as extraction may not be perfect.
"""
provider_name: Optional[str] = None
amount: Optional[float] = None
currency: Optional[str] = "MXN"
expense_date: Optional[date] = None
description: Optional[str] = None
raw_text: str
class ProvisionalExpense(BaseModel):
"""
Represents a fully processed but unconfirmed expense.
This is the state before the user validates the data.
"""
user_id: str
extracted_data: ExtractedExpense
# Classified fields
category: Optional[str] = "Por Determinar"
subcategory: Optional[str] = None
expense_type: Optional[str] = Field(None, alias="tipo_gasto_default", description="e.g., 'personal' or 'negocio'")
# Metadata
confidence_score: float
processing_method: str = Field(..., description="How the expense was classified, e.g., 'provider_match', 'keyword_match', 'ai_inference'")
validation_notes: List[str] = []
status: ExpenseStatus = ExpenseStatus.AWAITING_CONFIRMATION
timestamp: datetime = Field(default_factory=datetime.now)
class FinalExpense(BaseModel):
"""
Represents a final, user-confirmed expense record.
This is the data that will be stored permanently.
"""
user_id: str
provider_name: str
amount: float
currency: str
expense_date: date
description: Optional[str] = None
category: str
subcategory: Optional[str] = None
expense_type: str
# Audit trail
initial_processing_method: str
confirmed_by: str
confirmed_at: datetime = Field(default_factory=datetime.now)
audit_log: List[str] = []
status: ExpenseStatus = ExpenseStatus.CONFIRMED

14
app/schema/freeform.py Normal file
View File

@@ -0,0 +1,14 @@
"""
Pydantic schemas for unstructured or freeform text entries.
"""
from pydantic import BaseModel
from datetime import datetime
class FreeformEntry(BaseModel):
"""
Represents a piece of text that could not be structured into an expense.
"""
user_id: str
text: str
timestamp: datetime
notes: str = "Could not be automatically categorized."

10
app/schema/receipt.py Normal file
View File

@@ -0,0 +1,10 @@
"""
Pydantic schemas for structured receipts.
"""
from app.schema.base import FinalExpense
class Receipt(FinalExpense):
"""
A specialized expense model for receipts, could include line items in the future.
"""
pass

18
app/schema/report.py Normal file
View File

@@ -0,0 +1,18 @@
"""
Pydantic schemas for reports or summaries.
"""
from pydantic import BaseModel
from typing import List
from datetime import date
class ExpenseReport(BaseModel):
"""
Represents a summary or report of multiple expenses.
"""
report_name: str
start_date: date
end_date: date
total_amount: float
expense_count: int
# In a real app, you'd link to the actual expense models
expenses: List[int]

29
docker-compose.yml Normal file
View File

@@ -0,0 +1,29 @@
version: '3.8'
services:
app:
build: .
ports:
- "8000:80"
volumes:
- ./app:/app/app
- ./database.db:/app/database.db # Mount the SQLite DB file
env_file:
- .env
depends_on:
- db # Optional: if you switch to a managed DB like Postgres
# Optional PostgreSQL service
# db:
# image: postgres:13
# volumes:
# - postgres_data:/var/lib/postgresql/data/
# environment:
# - POSTGRES_USER=${DB_USER}
# - POSTGRES_PASSWORD=${DB_PASSWORD}
# - POSTGRES_DB=${DB_NAME}
# ports:
# - "5432:5432"
# volumes:
# postgres_data:

View File

@@ -1,7 +1,29 @@
# Web Framework
fastapi
python-telegram-bot
openai
google-api-python-client
pydantic
uvicorn
# Pydantic (used by FastAPI)
pydantic
# Environment variables
python-dotenv
# AI
openai
# Database
sqlalchemy
psycopg2-binary # For PostgreSQL, optional
alembic # For database migrations, optional
# Telegram Bot
python-telegram-bot
# HTTP Client
httpx
# For image/audio/pdf processing (examples, uncomment as needed)
# Pillow
# pytesseract
# PyMuPDF
# langdetect