feat: Implement deterministic expense matching using configurable providers and keywords, integrating it into the processing pipeline.

This commit is contained in:
Marco Gallegos
2025-12-18 12:25:48 -06:00
parent 899482580e
commit 519a5ad705
9 changed files with 338 additions and 136 deletions

View File

@@ -9,8 +9,14 @@ SUPERGROUP_ID=""
# Database connection string # Database connection string
# For SQLite: DATABASE_URL="sqlite:///database.db" # For SQLite: DATABASE_URL="sqlite:///database.db"
# For PostgreSQL: DATABASE_URL="postgresql://user:password@host:port/dbname" # For MySQL: DATABASE_URL="mysql+pymysql://user:password@db:3306/expenses"
DATABASE_URL="sqlite:///database.db" DATABASE_URL="mysql+pymysql://user:password@db:3306/expenses"
# MySQL specific (for Docker)
MYSQL_ROOT_PASSWORD="root_password"
MYSQL_DATABASE="expenses"
MYSQL_USER="user"
MYSQL_PASSWORD="password"
# Log level (e.g., DEBUG, INFO, WARNING, ERROR) # Log level (e.g., DEBUG, INFO, WARNING, ERROR)
LOG_LEVEL="INFO" LOG_LEVEL="INFO"

128
README.md
View File

@@ -1,79 +1,81 @@
# Telegram Expenses Bot # Telegram Expenses Bot
A bot to track expenses via Telegram messages, using AI for data extraction. A modular, AI-powered bot to track and manage expenses via Telegram. It uses LLMs to extract structured data from text, images, and audio, and persists them for easy reporting.
## Key Features
- 🤖 **AI Extraction**: Automatically parses amount, currency, description, and date from natural language.
- 🖼️ **Multimodal**: Supports text, images (receipts), and audio (voice notes) - *in progress*.
- 📊 **Structured Storage**: Saves data to a database with support for exporting to CSV/Google Sheets.
- 🛡️ **Audit Trail**: Keeps track of raw inputs and AI confidence scores for reliability.
- 🐳 **Dockerized**: Easy deployment using Docker and Docker Compose.
## Project Structure ## Project Structure
This project follows a modular, service-oriented architecture. The project has transitioned to a more robust, service-oriented architecture located in the `/app` directory.
- **/app**: Main application source code. - **/app**: Core application logic.
- **/ai**: AI models, prompts, and logic. - **/ai**: LLM integration, prompts, and extraction logic.
- **/audit**: Logging and raw data storage for traceability. - **/audit**: Logging and raw data storage for traceability.
- **/ingestion**: Handlers for different input types (text, image, audio). - **/ingestion**: Handlers for different input types (text, image, audio, document).
- **/integrations**: Connections to external services. - **/integrations**: External services (e.g., exporters, webhook clients).
- **/modules**: Telegram command handlers. - **/modules**: Telegram bot command handlers (`/start`, `/status`, etc.).
- **/persistence**: Database models and data access layer. - **/persistence**: Database models and repositories (SQLAlchemy).
- **/preprocessing**: Data cleaning and normalization. - **/preprocessing**: Data cleaning, validation, and language detection.
- **/schema**: Pydantic data models. - **/schema**: Pydantic models for data validation and API documentation.
- **main.py**: FastAPI application entry point. - **main.py**: FastAPI entry point and webhook handlers.
- **router.py**: Main workflow orchestrator. - **router.py**: Orchestrates the processing pipeline.
- **config.py**: Configuration loader. - **/config**: Static configuration files (keywords, providers).
- **/raw_storage**: (Created automatically) Stores original uploaded files. - **/src**: Legacy/Initial implementation (Phase 1 & 2).
- **Dockerfile**: Defines the container for the application. - **tasks.md**: Detailed project roadmap and progress tracker.
- **docker-compose.yml**: Orchestrates the application and database services.
- **requirements.txt**: Python dependencies.
- **.env.example**: Example environment variables.
## How to Run ## How It Works (Workflow)
1. **Set up environment variables:** 1. **Input**: The user sends a message to the Telegram bot (text, image, or voice).
2. **Ingestion**: The bot receives the update and passes it to the `/app/ingestion` layer to extract raw text.
3. **Routing**: `router.py` takes the raw text and coordinates the next steps.
4. **Extraction**: The `/app/ai/extractor.py` uses OpenAI's GPT models to parse the text into a structured `ExtractedExpense`.
5. **Audit & Classify**: The `/app/ai/classifier.py` assigns categories and a confidence score.
6. **Persistence**: If confidence is high, the expense is automatically saved via `/app/persistence/repositories.py`. If low, it awaits manual confirmation.
## Project Status
Current Phase: **Phase 3/4 - Intelligence & Processing**
- [x] **Phase 1: Infrastructure**: FastAPI, Docker, and basic input handling.
- [x] **Phase 2: Data Models**: Explicit expense states and Pydantic schemas.
- [/] **Phase 3: Logic**: Configuration loaders and provider matching (In Progress).
- [/] **Phase 4: AI Analyst**: Multimodal extraction and confidence scoring (In Progress).
## Setup & Development
### 1. Environment Variables
Copy `.env.example` to `.env` and fill in your credentials:
```bash ```bash
cp .env.example .env TELEGRAM_TOKEN=your_bot_token
``` OPENAI_API_KEY=your_openai_key
Fill in the values in the `.env` file (Telegram token, OpenAI key, etc.). DATABASE_URL=mysql+pymysql://user:password@db:3306/expenses
2. **Build and run with Docker Compose:** # MySQL specific (for Docker)
MYSQL_ROOT_PASSWORD=root_password
MYSQL_DATABASE=expenses
MYSQL_USER=user
MYSQL_PASSWORD=password
```
### 2. Run with Docker
```bash ```bash
docker-compose up --build docker-compose up --build
``` ```
3. **Access the API:** ### 3. Local Development (FastAPI)
The API will be available at `http://localhost:8000`. The interactive documentation can be found at `http://localhost:8000/docs`. ```bash
pip install -r requirements.txt
## Running the Telegram Bot uvicorn app.main:app --reload
This setup provides the backend API. To connect it to Telegram, you have two main options:
1. **Webhook**: Set a webhook with Telegram to point to your deployed API's `/webhook/telegram` endpoint. This is the recommended production approach.
2. **Polling**: Modify the application to use polling instead of a webhook. This involves creating a separate script or modifying `main.py` to start the `python-telegram-bot` `Application` and add the handlers from the `modules` directory. This is simpler for local development.
### Example: Adding Polling for Development
You could add this to a new file, `run_bot.py`, in the root directory:
```python
import asyncio
from telegram.ext import Application, CommandHandler, MessageHandler, filters
from app.config import config
from app.modules import start, upload, status, search, admin
def main() -> None:
"""Start the bot."""
application = Application.builder().token(config.TELEGRAM_TOKEN).build()
# Add command handlers
application.add_handler(CommandHandler("start", start.start))
application.add_handler(CommandHandler("status", status.status))
application.add_handler(CommandHandler("search", search.search))
application.add_handler(CommandHandler("admin", admin.admin_command))
# Add message handler
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, upload.handle_message))
# Run the bot
application.run_polling()
if __name__ == "__main__":
main()
``` ```
You would then run `python run_bot.py` locally.
### 4. Running the Bot (Polling)
For local testing without webhooks, you can run a polling script that uses the handlers in `app/modules`.
---
*Maintained by Marco Gallegos*

View File

@@ -0,0 +1,61 @@
"""
Configuration loader for providers and keywords.
"""
import csv
import os
import logging
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
# Paths to configuration files
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
PROVIDERS_PATH = os.path.join(BASE_DIR, 'config', 'providers.csv')
KEYWORDS_PATH = os.path.join(BASE_DIR, 'config', 'keywords.csv')
def load_providers() -> List[Dict[str, Any]]:
"""
Loads the providers configuration from CSV.
"""
providers = []
if not os.path.exists(PROVIDERS_PATH):
logger.warning(f"Providers file not found at {PROVIDERS_PATH}")
return providers
try:
with open(PROVIDERS_PATH, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# Process aliases into a list
if 'aliases' in row and row['aliases']:
row['aliases'] = [a.strip().lower() for a in row['aliases'].split(',')]
else:
row['aliases'] = []
providers.append(row)
logger.info(f"Loaded {len(providers)} providers from {PROVIDERS_PATH}")
except Exception as e:
logger.error(f"Error loading providers: {e}")
return providers
def load_keywords() -> List[Dict[str, Any]]:
"""
Loads the keywords configuration from CSV.
"""
keywords = []
if not os.path.exists(KEYWORDS_PATH):
logger.warning(f"Keywords file not found at {KEYWORDS_PATH}")
return keywords
try:
with open(KEYWORDS_PATH, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if 'keyword' in row:
row['keyword'] = row['keyword'].strip().lower()
keywords.append(row)
logger.info(f"Loaded {len(keywords)} keywords from {KEYWORDS_PATH}")
except Exception as e:
logger.error(f"Error loading keywords: {e}")
return keywords

View File

@@ -0,0 +1,90 @@
"""
Matching logic for providers and keywords.
"""
import logging
from typing import Optional, Dict, Any
from app.preprocessing.config_loader import load_providers, load_keywords
logger = logging.getLogger(__name__)
# Global cache for configuration
_PROVIDERS = None
_KEYWORDS = None
def get_config():
"""
Returns the loaded configuration, using cache if available.
"""
global _PROVIDERS, _KEYWORDS
if _PROVIDERS is None:
_PROVIDERS = load_providers()
if _KEYWORDS is None:
_KEYWORDS = load_keywords()
return _PROVIDERS, _KEYWORDS
def match_provider(description: str) -> Optional[Dict[str, Any]]:
"""
Searches for a provider name or alias in the description.
"""
providers, _ = get_config()
desc_lower = description.lower()
for p in providers:
name = p.get('provider_name', '').lower()
aliases = p.get('aliases', [])
# Check name
if name and name in desc_lower:
return p
# Check aliases
for alias in aliases:
if alias and alias in desc_lower:
return p
return None
def match_keywords(description: str) -> Optional[Dict[str, Any]]:
"""
Searches for keywords in the description.
"""
_, keywords = get_config()
desc_lower = description.lower()
for k in keywords:
keyword = k.get('keyword', '').lower()
if keyword and keyword in desc_lower:
return k
return None
def get_metadata_from_match(description: str) -> Dict[str, Any]:
"""
Attempts to find metadata (category, subcategory, etc.) for a description.
Priority: Provider Match > Keyword Match.
"""
# 1. Try Provider Match
provider = match_provider(description)
if provider:
logger.info(f"Matched provider: {provider['provider_name']}")
return {
"category": provider.get('categoria_principal'),
"subcategory": provider.get('subcategoria'),
"expense_type": provider.get('tipo_gasto_default'),
"match_type": "provider",
"matched_name": provider['provider_name']
}
# 2. Try Keyword Match
keyword = match_keywords(description)
if keyword:
logger.info(f"Matched keyword: {keyword['keyword']}")
return {
"category": keyword.get('categoria_principal'),
"subcategory": keyword.get('subcategoria'),
"expense_type": keyword.get('tipo_gasto_default'),
"match_type": "keyword",
"matched_name": keyword['keyword']
}
return {}

View File

@@ -8,6 +8,7 @@ import logging
from app.schema.base import RawInput, ProvisionalExpense, FinalExpense, ExpenseStatus from app.schema.base import RawInput, ProvisionalExpense, FinalExpense, ExpenseStatus
from app.ingestion import text, image, audio, document from app.ingestion import text, image, audio, document
from app.ai import extractor, classifier from app.ai import extractor, classifier
from app.preprocessing import matcher
from app.persistence import repositories from app.persistence import repositories
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -59,18 +60,22 @@ def process_expense_input(db: Session, raw_input: RawInput) -> FinalExpense:
audited_expense = classifier.classify_and_audit(provisional_expense) audited_expense = classifier.classify_and_audit(provisional_expense)
# 3.5 Deterministic Matching (Phase 3)
# Enrich data with categories from providers/keywords if available
match_metadata = matcher.get_metadata_from_match(extracted_data.description)
# For now, we auto-confirm if confidence is high. # For now, we auto-confirm if confidence is high.
if audited_expense.confidence_score > 0.7: if audited_expense.confidence_score > 0.7:
final_expense = FinalExpense( final_expense = FinalExpense(
user_id=audited_expense.user_id, user_id=audited_expense.user_id,
provider_name=audited_expense.extracted_data.description, # Simplified mapping provider_name=match_metadata.get("matched_name") or audited_expense.extracted_data.description,
amount=audited_expense.extracted_data.amount, amount=audited_expense.extracted_data.amount,
currency=audited_expense.extracted_data.currency, currency=audited_expense.extracted_data.currency,
expense_date=audited_expense.extracted_data.expense_date, expense_date=audited_expense.extracted_data.expense_date,
description=audited_expense.extracted_data.description, description=audited_expense.extracted_data.description,
category=audited_expense.category, category=match_metadata.get("category") or audited_expense.category,
expense_type="personal", # Default expense_type=match_metadata.get("expense_type") or "personal",
initial_processing_method=audited_expense.processing_method, initial_processing_method=match_metadata.get("match_type") or audited_expense.processing_method,
confirmed_by="auto-confirm" confirmed_by="auto-confirm"
) )

View File

@@ -1,4 +1,4 @@
version: '3.8' version: "3.8"
services: services:
app: app:
@@ -7,23 +7,23 @@ services:
- "8000:80" - "8000:80"
volumes: volumes:
- ./app:/app/app - ./app:/app/app
- ./database.db:/app/database.db # Mount the SQLite DB file
env_file: env_file:
- .env - .env
depends_on: depends_on:
- db # Optional: if you switch to a managed DB like Postgres - db
# Optional PostgreSQL service db:
# db: image: mysql:8.0
# image: postgres:13 restart: always
# volumes: environment:
# - postgres_data:/var/lib/postgresql/data/ MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-root_password}
# environment: MYSQL_DATABASE: ${MYSQL_DATABASE:-expenses}
# - POSTGRES_USER=${DB_USER} MYSQL_USER: ${MYSQL_USER:-user}
# - POSTGRES_PASSWORD=${DB_PASSWORD} MYSQL_PASSWORD: ${MYSQL_PASSWORD:-password}
# - POSTGRES_DB=${DB_NAME} ports:
# ports: - "3306:3306"
# - "5432:5432" volumes:
- mysql_data:/var/lib/mysql
# volumes: volumes:
# postgres_data: mysql_data:

View File

@@ -13,6 +13,8 @@ openai
# Database # Database
sqlalchemy sqlalchemy
pymysql # For MySQL support
cryptography # Required for some MySQL auth methods
psycopg2-binary # For PostgreSQL, optional psycopg2-binary # For PostgreSQL, optional
alembic # For database migrations, optional alembic # For database migrations, optional

102
tasks.md
View File

@@ -13,20 +13,19 @@ Principio rector:
**Objetivo:** Recibir datos de gastos y dejarlos listos para procesar. **Objetivo:** Recibir datos de gastos y dejarlos listos para procesar.
- [x] **1.1 Bootstrap del Proyecto** - [x] **1.1 Bootstrap del Proyecto**
- [x] Crear estructura de carpetas según README. - [x] Crear estructura de carpetas modular en `/app`.
- [x] Configurar entorno virtual. - [x] Configurar entorno virtual y `requirements.txt`.
- [x] Instalar dependencias. - [x] Dockerización con `docker-compose.yml`.
- [x] FastAPI levantando correctamente. - [x] **1.2 Configuración y Base de Datos**
- [x] **1.2 Variables de Entorno** - [x] Definir `.env.example` con variables para OpenAI, Telegram y MySQL.
- [x] Definir `.env.example` con las variables necesarias. - [x] Configurar servicio de **MySQL 8.0** en Docker.
- [x] **1.3 Webhook y Entrada de Datos** - [x] Implementar `app/config.py` para carga de variables.
- **NOTA:** Se ha modificado el enfoque. En lugar de un webhook directo de Telegram, se utiliza **n8n** para manejar la recepción de datos. La aplicación expone un endpoint genérico `/process-expense` para este propósito. - [x] **1.3 Entrada de Datos (Multimodal)**
- [x] Endpoint `/process-expense` implementado en FastAPI. - [x] Endpoint `/process-expense` para integración externa.
- [x] El endpoint recibe y loguea el payload. - [x] Endpoint `/webhook/telegram` para recepción directa.
- [x] **1.4 Input Handler** - [x] Implementar módulos de ingestión inicial (`text.py`, `image.py`, `audio.py`).
- [x] Implementar `input_handler.py`. - [x] **1.4 Orquestación Inicial**
- [x] Normalizar texto. - [x] Implementar `router.py` para coordinar el pipeline.
- [x] Implementar stubs para voz, imagen y PDF.
--- ---
@@ -35,71 +34,72 @@ Principio rector:
**Objetivo:** Tener claridad absoluta sobre qué es un gasto y en qué estado vive. **Objetivo:** Tener claridad absoluta sobre qué es un gasto y en qué estado vive.
- [x] **2.1 Modelos Pydantic** - [x] **2.1 Modelos Pydantic**
- [x] Crear modelos: `RawInput`, `ExtractedExpense`, `ProvisionalExpense`, `FinalExpense`. - [x] Crear modelos en `app/schema/base.py`: `RawInput`, `ExtractedExpense`, `ProvisionalExpense`, `FinalExpense`.
- [x] **2.2 Estados del Gasto** - [x] **2.2 Estados del Gasto**
- [x] Definir estados explícitos: `RECEIVED`, `ANALYZED`, `AWAITING_CONFIRMATION`, `CONFIRMED`, `CORRECTED`, `STORED`. - [x] Definir `ExpenseStatus` (RECEIVED, ANALYZED, CONFIRMED, etc.).
- [x] **2.3 Persistencia SQL**
- [x] Implementar modelos SQLAlchemy y repositorios en `app/persistence`.
--- ---
## Fase 3 Configuración como Lógica ## Fase 3 Configuración y Lógica de Negocio
**Objetivo:** Mover la inteligencia determinística fuera del código. **Objetivo:** Mover la inteligencia determinística fuera del código.
- [ ] **3.1 Loader de Configuración** - [/] **3.1 Loader de Configuración**
- [ ] Implementar `config_loader.py`. - [ ] Implementar carga dinámica de `config/providers.csv` y `keywords.csv`.
- [ ] **3.2 Matching de Proveedores** - [ ] **3.2 Matching de Proveedores**
- [ ] Implementar matching por nombre y aliases. - [ ] Implementar matching por nombre y aliases.
- [ ] **3.3 Matching de Keywords** - [ ] **3.3 Clasificación por Keywords**
- [ ] Implementar búsqueda de keywords en descripciones. - [ ] Implementar búsqueda de keywords en descripciones para categorización automática.
--- ---
## Fase 4 The Analyst (Procesamiento Inteligente) ## Fase 4 The Analyst (Procesamiento Inteligente)
**Objetivo:** Convertir texto crudo en un gasto provisional estructurado. **Objetivo:** Convertir texto crudo en un gasto provisional estructurado mediante IA.
- [ ] **4.1 Extracción Multimodal (Completa)** - [/] **4.1 Extracción Multimodal (Completa)**
- [ ] Voz → transcripción IA. - [x] Texto -> Extracción con GPT.
- [ ] Imagen → OCR IA. - [ ] Voz -> Transcripción (Whisper/OpenAI).
- [ ] PDF → extracción semiestructurada. - [ ] Imagen -> OCR + Extracción.
- [ ] **4.2 Clasificación en Cascada** - [ ] **4.2 Validación y Score de Confianza**
- [ ] Implementar pipeline: Proveedores → Keywords → IA. - [ ] Implementar `app/ai/confidence.py` para evaluar la calidad de la extracción.
- [ ] **4.3 Validación Fiscal Básica** - [ ] **4.3 Detección de Duplicados**
- [ ] Implementar detección de CFDI y validación de RFC. - [ ] Evitar registrar el mismo gasto dos veces.
- [ ] **4.4 Score de Confianza**
- [ ] Calcular y persistir el score de confianza del análisis.
--- ---
## Fase 5 Interacción y Auditoría ## Fase 5 Interacción con el Usuario
**Objetivo:** Asegurar control humano y trazabilidad. **Objetivo:** Asegurar control humano y correcciones.
- [ ] **5.1 Mensaje de Confirmación** - [ ] **5.1 Flujo de Confirmación en Telegram**
- [ ] Enviar resumen del gasto procesado al usuario. - [ ] Enviar botones de "Confirmar" / "Editar" tras procesar un gasto.
- [ ] **5.2 Parsing de Correcciones** - [ ] **5.2 Parsing de Correcciones**
- [ ] Implementar la capacidad de aceptar correcciones en lenguaje natural. - [ ] Capacidad de corregir campos específicos mediante mensajes de texto.
- [ ] **5.3 The Auditor** - [ ] **5.3 Comandos de Consulta**
- [ ] Implementar el agente "Auditor" para registrar todos los cambios. - [ ] Implementar `/status` y `/search` funcionales.
--- ---
## Fase 6 Persistencia y Cierre ## Fase 6 Exportación y Cierre
**Objetivo:** Guardar datos finales de forma segura y limpia. **Objetivo:** Facilitar el uso de los datos fuera del sistema.
- [ ] **6.1 Google Sheets** - [ ] **6.1 Exportación a CSV/Excel**
- [ ] Implementar la escritura de datos en Google Sheets. - [x] Implementar exportador básico a CSV.
- [ ] **6.2 Limpieza de Estados Temporales** - [ ] **6.2 Integración con Google Sheets (Opcional)**
- [ ] Asegurar la limpieza de datos temporales tras el procesamiento. - [ ] Sincronización automática de gastos confirmados.
--- ---
## Fase 7 Hardening y Preparación a Futuro ## Fase 7 Hardening
**Objetivo:** Fortalecer el sistema y prepararlo para escalar. **Objetivo:** Estabilidad y producción.
- [ ] **7.1 Logs y Errores** - [ ] **7.1 Manejo de Errores Robusto**
- [ ] Implementar logs estructurados y un manejo de errores robusto. - [ ] Reintentos en llamadas a API de IA.
- [ ] **7.2 Preparación para Escalar** - [ ] Alertas de sistema.
- [ ] Diseñar el sistema para soportar múltiples usuarios en el futuro. - [ ] **7.2 Logs de Auditoría**
- [ ] Registro detallado de quién cambió qué y cuándo.

36
verify_matcher.py Normal file
View File

@@ -0,0 +1,36 @@
"""
Verification script for matching logic.
"""
import sys
import os
# Add project root to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from app.preprocessing.matcher import get_metadata_from_match
def test_match(description: str):
print(f"\nTesting: '{description}'")
metadata = get_metadata_from_match(description)
if metadata:
print(f" Match Found!")
print(f" Type: {metadata.get('match_type')}")
print(f" Name: {metadata.get('matched_name')}")
print(f" Category: {metadata.get('category')}")
print(f" Subcategory: {metadata.get('subcategory')}")
print(f" Expense Type: {metadata.get('expense_type')}")
else:
print(" No match found.")
if __name__ == "__main__":
# Test providers
test_match("Lunch at Amazon")
test_match("Uber Eats dinner")
test_match("Office Depot supplies")
# Test keywords
test_match("New monitor for work")
test_match("Croquetas for the dog")
# Test no match
test_match("Random expense")