From 519a5ad70595052ed23bdbc50defbcb7ba7757f4 Mon Sep 17 00:00:00 2001 From: Marco Gallegos Date: Thu, 18 Dec 2025 12:25:48 -0600 Subject: [PATCH] feat: Implement deterministic expense matching using configurable providers and keywords, integrating it into the processing pipeline. --- .env.example | 10 ++- README.md | 128 +++++++++++++++-------------- app/preprocessing/config_loader.py | 61 ++++++++++++++ app/preprocessing/matcher.py | 90 ++++++++++++++++++++ app/router.py | 13 ++- docker-compose.yml | 32 ++++---- requirements.txt | 2 + tasks.md | 102 +++++++++++------------ verify_matcher.py | 36 ++++++++ 9 files changed, 338 insertions(+), 136 deletions(-) create mode 100644 app/preprocessing/config_loader.py create mode 100644 app/preprocessing/matcher.py create mode 100644 verify_matcher.py diff --git a/.env.example b/.env.example index 1f32bed..b759d51 100644 --- a/.env.example +++ b/.env.example @@ -9,8 +9,14 @@ SUPERGROUP_ID="" # Database connection string # For SQLite: DATABASE_URL="sqlite:///database.db" -# For PostgreSQL: DATABASE_URL="postgresql://user:password@host:port/dbname" -DATABASE_URL="sqlite:///database.db" +# For MySQL: DATABASE_URL="mysql+pymysql://user:password@db:3306/expenses" +DATABASE_URL="mysql+pymysql://user:password@db:3306/expenses" + +# MySQL specific (for Docker) +MYSQL_ROOT_PASSWORD="root_password" +MYSQL_DATABASE="expenses" +MYSQL_USER="user" +MYSQL_PASSWORD="password" # Log level (e.g., DEBUG, INFO, WARNING, ERROR) LOG_LEVEL="INFO" diff --git a/README.md b/README.md index 22ef0a6..dd28102 100644 --- a/README.md +++ b/README.md @@ -1,79 +1,81 @@ # Telegram Expenses Bot -A bot to track expenses via Telegram messages, using AI for data extraction. +A modular, AI-powered bot to track and manage expenses via Telegram. It uses LLMs to extract structured data from text, images, and audio, and persists them for easy reporting. + +## Key Features + +- 🤖 **AI Extraction**: Automatically parses amount, currency, description, and date from natural language. +- 🖼️ **Multimodal**: Supports text, images (receipts), and audio (voice notes) - *in progress*. +- 📊 **Structured Storage**: Saves data to a database with support for exporting to CSV/Google Sheets. +- 🛡️ **Audit Trail**: Keeps track of raw inputs and AI confidence scores for reliability. +- 🐳 **Dockerized**: Easy deployment using Docker and Docker Compose. ## Project Structure -This project follows a modular, service-oriented architecture. +The project has transitioned to a more robust, service-oriented architecture located in the `/app` directory. -- **/app**: Main application source code. - - **/ai**: AI models, prompts, and logic. +- **/app**: Core application logic. + - **/ai**: LLM integration, prompts, and extraction logic. - **/audit**: Logging and raw data storage for traceability. - - **/ingestion**: Handlers for different input types (text, image, audio). - - **/integrations**: Connections to external services. - - **/modules**: Telegram command handlers. - - **/persistence**: Database models and data access layer. - - **/preprocessing**: Data cleaning and normalization. - - **/schema**: Pydantic data models. - - **main.py**: FastAPI application entry point. - - **router.py**: Main workflow orchestrator. - - **config.py**: Configuration loader. -- **/raw_storage**: (Created automatically) Stores original uploaded files. -- **Dockerfile**: Defines the container for the application. -- **docker-compose.yml**: Orchestrates the application and database services. -- **requirements.txt**: Python dependencies. -- **.env.example**: Example environment variables. + - **/ingestion**: Handlers for different input types (text, image, audio, document). + - **/integrations**: External services (e.g., exporters, webhook clients). + - **/modules**: Telegram bot command handlers (`/start`, `/status`, etc.). + - **/persistence**: Database models and repositories (SQLAlchemy). + - **/preprocessing**: Data cleaning, validation, and language detection. + - **/schema**: Pydantic models for data validation and API documentation. + - **main.py**: FastAPI entry point and webhook handlers. + - **router.py**: Orchestrates the processing pipeline. +- **/config**: Static configuration files (keywords, providers). +- **/src**: Legacy/Initial implementation (Phase 1 & 2). +- **tasks.md**: Detailed project roadmap and progress tracker. -## How to Run +## How It Works (Workflow) -1. **Set up environment variables:** - ```bash - cp .env.example .env - ``` - Fill in the values in the `.env` file (Telegram token, OpenAI key, etc.). +1. **Input**: The user sends a message to the Telegram bot (text, image, or voice). +2. **Ingestion**: The bot receives the update and passes it to the `/app/ingestion` layer to extract raw text. +3. **Routing**: `router.py` takes the raw text and coordinates the next steps. +4. **Extraction**: The `/app/ai/extractor.py` uses OpenAI's GPT models to parse the text into a structured `ExtractedExpense`. +5. **Audit & Classify**: The `/app/ai/classifier.py` assigns categories and a confidence score. +6. **Persistence**: If confidence is high, the expense is automatically saved via `/app/persistence/repositories.py`. If low, it awaits manual confirmation. -2. **Build and run with Docker Compose:** - ```bash - docker-compose up --build - ``` +## Project Status -3. **Access the API:** - The API will be available at `http://localhost:8000`. The interactive documentation can be found at `http://localhost:8000/docs`. +Current Phase: **Phase 3/4 - Intelligence & Processing** -## Running the Telegram Bot +- [x] **Phase 1: Infrastructure**: FastAPI, Docker, and basic input handling. +- [x] **Phase 2: Data Models**: Explicit expense states and Pydantic schemas. +- [/] **Phase 3: Logic**: Configuration loaders and provider matching (In Progress). +- [/] **Phase 4: AI Analyst**: Multimodal extraction and confidence scoring (In Progress). -This setup provides the backend API. To connect it to Telegram, you have two main options: +## Setup & Development -1. **Webhook**: Set a webhook with Telegram to point to your deployed API's `/webhook/telegram` endpoint. This is the recommended production approach. -2. **Polling**: Modify the application to use polling instead of a webhook. This involves creating a separate script or modifying `main.py` to start the `python-telegram-bot` `Application` and add the handlers from the `modules` directory. This is simpler for local development. +### 1. Environment Variables +Copy `.env.example` to `.env` and fill in your credentials: +```bash +TELEGRAM_TOKEN=your_bot_token +OPENAI_API_KEY=your_openai_key +DATABASE_URL=mysql+pymysql://user:password@db:3306/expenses -### Example: Adding Polling for Development - -You could add this to a new file, `run_bot.py`, in the root directory: - -```python -import asyncio -from telegram.ext import Application, CommandHandler, MessageHandler, filters -from app.config import config -from app.modules import start, upload, status, search, admin - -def main() -> None: - """Start the bot.""" - application = Application.builder().token(config.TELEGRAM_TOKEN).build() - - # Add command handlers - application.add_handler(CommandHandler("start", start.start)) - application.add_handler(CommandHandler("status", status.status)) - application.add_handler(CommandHandler("search", search.search)) - application.add_handler(CommandHandler("admin", admin.admin_command)) - - # Add message handler - application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, upload.handle_message)) - - # Run the bot - application.run_polling() - -if __name__ == "__main__": - main() +# MySQL specific (for Docker) +MYSQL_ROOT_PASSWORD=root_password +MYSQL_DATABASE=expenses +MYSQL_USER=user +MYSQL_PASSWORD=password ``` -You would then run `python run_bot.py` locally. + +### 2. Run with Docker +```bash +docker-compose up --build +``` + +### 3. Local Development (FastAPI) +```bash +pip install -r requirements.txt +uvicorn app.main:app --reload +``` + +### 4. Running the Bot (Polling) +For local testing without webhooks, you can run a polling script that uses the handlers in `app/modules`. + +--- +*Maintained by Marco Gallegos* diff --git a/app/preprocessing/config_loader.py b/app/preprocessing/config_loader.py new file mode 100644 index 0000000..e3a937c --- /dev/null +++ b/app/preprocessing/config_loader.py @@ -0,0 +1,61 @@ +""" +Configuration loader for providers and keywords. +""" +import csv +import os +import logging +from typing import List, Dict, Any + +logger = logging.getLogger(__name__) + +# Paths to configuration files +BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +PROVIDERS_PATH = os.path.join(BASE_DIR, 'config', 'providers.csv') +KEYWORDS_PATH = os.path.join(BASE_DIR, 'config', 'keywords.csv') + +def load_providers() -> List[Dict[str, Any]]: + """ + Loads the providers configuration from CSV. + """ + providers = [] + if not os.path.exists(PROVIDERS_PATH): + logger.warning(f"Providers file not found at {PROVIDERS_PATH}") + return providers + + try: + with open(PROVIDERS_PATH, mode='r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + # Process aliases into a list + if 'aliases' in row and row['aliases']: + row['aliases'] = [a.strip().lower() for a in row['aliases'].split(',')] + else: + row['aliases'] = [] + providers.append(row) + logger.info(f"Loaded {len(providers)} providers from {PROVIDERS_PATH}") + except Exception as e: + logger.error(f"Error loading providers: {e}") + + return providers + +def load_keywords() -> List[Dict[str, Any]]: + """ + Loads the keywords configuration from CSV. + """ + keywords = [] + if not os.path.exists(KEYWORDS_PATH): + logger.warning(f"Keywords file not found at {KEYWORDS_PATH}") + return keywords + + try: + with open(KEYWORDS_PATH, mode='r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + if 'keyword' in row: + row['keyword'] = row['keyword'].strip().lower() + keywords.append(row) + logger.info(f"Loaded {len(keywords)} keywords from {KEYWORDS_PATH}") + except Exception as e: + logger.error(f"Error loading keywords: {e}") + + return keywords diff --git a/app/preprocessing/matcher.py b/app/preprocessing/matcher.py new file mode 100644 index 0000000..dec677f --- /dev/null +++ b/app/preprocessing/matcher.py @@ -0,0 +1,90 @@ +""" +Matching logic for providers and keywords. +""" +import logging +from typing import Optional, Dict, Any +from app.preprocessing.config_loader import load_providers, load_keywords + +logger = logging.getLogger(__name__) + +# Global cache for configuration +_PROVIDERS = None +_KEYWORDS = None + +def get_config(): + """ + Returns the loaded configuration, using cache if available. + """ + global _PROVIDERS, _KEYWORDS + if _PROVIDERS is None: + _PROVIDERS = load_providers() + if _KEYWORDS is None: + _KEYWORDS = load_keywords() + return _PROVIDERS, _KEYWORDS + +def match_provider(description: str) -> Optional[Dict[str, Any]]: + """ + Searches for a provider name or alias in the description. + """ + providers, _ = get_config() + desc_lower = description.lower() + + for p in providers: + name = p.get('provider_name', '').lower() + aliases = p.get('aliases', []) + + # Check name + if name and name in desc_lower: + return p + + # Check aliases + for alias in aliases: + if alias and alias in desc_lower: + return p + + return None + +def match_keywords(description: str) -> Optional[Dict[str, Any]]: + """ + Searches for keywords in the description. + """ + _, keywords = get_config() + desc_lower = description.lower() + + for k in keywords: + keyword = k.get('keyword', '').lower() + if keyword and keyword in desc_lower: + return k + + return None + +def get_metadata_from_match(description: str) -> Dict[str, Any]: + """ + Attempts to find metadata (category, subcategory, etc.) for a description. + Priority: Provider Match > Keyword Match. + """ + # 1. Try Provider Match + provider = match_provider(description) + if provider: + logger.info(f"Matched provider: {provider['provider_name']}") + return { + "category": provider.get('categoria_principal'), + "subcategory": provider.get('subcategoria'), + "expense_type": provider.get('tipo_gasto_default'), + "match_type": "provider", + "matched_name": provider['provider_name'] + } + + # 2. Try Keyword Match + keyword = match_keywords(description) + if keyword: + logger.info(f"Matched keyword: {keyword['keyword']}") + return { + "category": keyword.get('categoria_principal'), + "subcategory": keyword.get('subcategoria'), + "expense_type": keyword.get('tipo_gasto_default'), + "match_type": "keyword", + "matched_name": keyword['keyword'] + } + + return {} diff --git a/app/router.py b/app/router.py index 53c0b50..4efdf0e 100644 --- a/app/router.py +++ b/app/router.py @@ -8,6 +8,7 @@ import logging from app.schema.base import RawInput, ProvisionalExpense, FinalExpense, ExpenseStatus from app.ingestion import text, image, audio, document from app.ai import extractor, classifier +from app.preprocessing import matcher from app.persistence import repositories from sqlalchemy.orm import Session @@ -59,18 +60,22 @@ def process_expense_input(db: Session, raw_input: RawInput) -> FinalExpense: audited_expense = classifier.classify_and_audit(provisional_expense) + # 3.5 Deterministic Matching (Phase 3) + # Enrich data with categories from providers/keywords if available + match_metadata = matcher.get_metadata_from_match(extracted_data.description) + # For now, we auto-confirm if confidence is high. if audited_expense.confidence_score > 0.7: final_expense = FinalExpense( user_id=audited_expense.user_id, - provider_name=audited_expense.extracted_data.description, # Simplified mapping + provider_name=match_metadata.get("matched_name") or audited_expense.extracted_data.description, amount=audited_expense.extracted_data.amount, currency=audited_expense.extracted_data.currency, expense_date=audited_expense.extracted_data.expense_date, description=audited_expense.extracted_data.description, - category=audited_expense.category, - expense_type="personal", # Default - initial_processing_method=audited_expense.processing_method, + category=match_metadata.get("category") or audited_expense.category, + expense_type=match_metadata.get("expense_type") or "personal", + initial_processing_method=match_metadata.get("match_type") or audited_expense.processing_method, confirmed_by="auto-confirm" ) diff --git a/docker-compose.yml b/docker-compose.yml index cae4b87..e421322 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.8' +version: "3.8" services: app: @@ -7,23 +7,23 @@ services: - "8000:80" volumes: - ./app:/app/app - - ./database.db:/app/database.db # Mount the SQLite DB file env_file: - .env depends_on: - - db # Optional: if you switch to a managed DB like Postgres + - db - # Optional PostgreSQL service - # db: - # image: postgres:13 - # volumes: - # - postgres_data:/var/lib/postgresql/data/ - # environment: - # - POSTGRES_USER=${DB_USER} - # - POSTGRES_PASSWORD=${DB_PASSWORD} - # - POSTGRES_DB=${DB_NAME} - # ports: - # - "5432:5432" + db: + image: mysql:8.0 + restart: always + environment: + MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-root_password} + MYSQL_DATABASE: ${MYSQL_DATABASE:-expenses} + MYSQL_USER: ${MYSQL_USER:-user} + MYSQL_PASSWORD: ${MYSQL_PASSWORD:-password} + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql -# volumes: -# postgres_data: +volumes: + mysql_data: diff --git a/requirements.txt b/requirements.txt index 2e91e0d..627290e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,8 @@ openai # Database sqlalchemy +pymysql # For MySQL support +cryptography # Required for some MySQL auth methods psycopg2-binary # For PostgreSQL, optional alembic # For database migrations, optional diff --git a/tasks.md b/tasks.md index ca61514..13f9346 100644 --- a/tasks.md +++ b/tasks.md @@ -13,20 +13,19 @@ Principio rector: **Objetivo:** Recibir datos de gastos y dejarlos listos para procesar. - [x] **1.1 Bootstrap del Proyecto** - - [x] Crear estructura de carpetas según README. - - [x] Configurar entorno virtual. - - [x] Instalar dependencias. - - [x] FastAPI levantando correctamente. -- [x] **1.2 Variables de Entorno** - - [x] Definir `.env.example` con las variables necesarias. -- [x] **1.3 Webhook y Entrada de Datos** - - **NOTA:** Se ha modificado el enfoque. En lugar de un webhook directo de Telegram, se utiliza **n8n** para manejar la recepción de datos. La aplicación expone un endpoint genérico `/process-expense` para este propósito. - - [x] Endpoint `/process-expense` implementado en FastAPI. - - [x] El endpoint recibe y loguea el payload. -- [x] **1.4 Input Handler** - - [x] Implementar `input_handler.py`. - - [x] Normalizar texto. - - [x] Implementar stubs para voz, imagen y PDF. + - [x] Crear estructura de carpetas modular en `/app`. + - [x] Configurar entorno virtual y `requirements.txt`. + - [x] Dockerización con `docker-compose.yml`. +- [x] **1.2 Configuración y Base de Datos** + - [x] Definir `.env.example` con variables para OpenAI, Telegram y MySQL. + - [x] Configurar servicio de **MySQL 8.0** en Docker. + - [x] Implementar `app/config.py` para carga de variables. +- [x] **1.3 Entrada de Datos (Multimodal)** + - [x] Endpoint `/process-expense` para integración externa. + - [x] Endpoint `/webhook/telegram` para recepción directa. + - [x] Implementar módulos de ingestión inicial (`text.py`, `image.py`, `audio.py`). +- [x] **1.4 Orquestación Inicial** + - [x] Implementar `router.py` para coordinar el pipeline. --- @@ -35,71 +34,72 @@ Principio rector: **Objetivo:** Tener claridad absoluta sobre qué es un gasto y en qué estado vive. - [x] **2.1 Modelos Pydantic** - - [x] Crear modelos: `RawInput`, `ExtractedExpense`, `ProvisionalExpense`, `FinalExpense`. + - [x] Crear modelos en `app/schema/base.py`: `RawInput`, `ExtractedExpense`, `ProvisionalExpense`, `FinalExpense`. - [x] **2.2 Estados del Gasto** - - [x] Definir estados explícitos: `RECEIVED`, `ANALYZED`, `AWAITING_CONFIRMATION`, `CONFIRMED`, `CORRECTED`, `STORED`. + - [x] Definir `ExpenseStatus` (RECEIVED, ANALYZED, CONFIRMED, etc.). +- [x] **2.3 Persistencia SQL** + - [x] Implementar modelos SQLAlchemy y repositorios en `app/persistence`. --- -## Fase 3 – Configuración como Lógica +## Fase 3 – Configuración y Lógica de Negocio **Objetivo:** Mover la inteligencia determinística fuera del código. -- [ ] **3.1 Loader de Configuración** - - [ ] Implementar `config_loader.py`. +- [/] **3.1 Loader de Configuración** + - [ ] Implementar carga dinámica de `config/providers.csv` y `keywords.csv`. - [ ] **3.2 Matching de Proveedores** - [ ] Implementar matching por nombre y aliases. -- [ ] **3.3 Matching de Keywords** - - [ ] Implementar búsqueda de keywords en descripciones. +- [ ] **3.3 Clasificación por Keywords** + - [ ] Implementar búsqueda de keywords en descripciones para categorización automática. --- ## Fase 4 – The Analyst (Procesamiento Inteligente) -**Objetivo:** Convertir texto crudo en un gasto provisional estructurado. +**Objetivo:** Convertir texto crudo en un gasto provisional estructurado mediante IA. -- [ ] **4.1 Extracción Multimodal (Completa)** - - [ ] Voz → transcripción IA. - - [ ] Imagen → OCR IA. - - [ ] PDF → extracción semiestructurada. -- [ ] **4.2 Clasificación en Cascada** - - [ ] Implementar pipeline: Proveedores → Keywords → IA. -- [ ] **4.3 Validación Fiscal Básica** - - [ ] Implementar detección de CFDI y validación de RFC. -- [ ] **4.4 Score de Confianza** - - [ ] Calcular y persistir el score de confianza del análisis. +- [/] **4.1 Extracción Multimodal (Completa)** + - [x] Texto -> Extracción con GPT. + - [ ] Voz -> Transcripción (Whisper/OpenAI). + - [ ] Imagen -> OCR + Extracción. +- [ ] **4.2 Validación y Score de Confianza** + - [ ] Implementar `app/ai/confidence.py` para evaluar la calidad de la extracción. +- [ ] **4.3 Detección de Duplicados** + - [ ] Evitar registrar el mismo gasto dos veces. --- -## Fase 5 – Interacción y Auditoría +## Fase 5 – Interacción con el Usuario -**Objetivo:** Asegurar control humano y trazabilidad. +**Objetivo:** Asegurar control humano y correcciones. -- [ ] **5.1 Mensaje de Confirmación** - - [ ] Enviar resumen del gasto procesado al usuario. +- [ ] **5.1 Flujo de Confirmación en Telegram** + - [ ] Enviar botones de "Confirmar" / "Editar" tras procesar un gasto. - [ ] **5.2 Parsing de Correcciones** - - [ ] Implementar la capacidad de aceptar correcciones en lenguaje natural. -- [ ] **5.3 The Auditor** - - [ ] Implementar el agente "Auditor" para registrar todos los cambios. + - [ ] Capacidad de corregir campos específicos mediante mensajes de texto. +- [ ] **5.3 Comandos de Consulta** + - [ ] Implementar `/status` y `/search` funcionales. --- -## Fase 6 – Persistencia y Cierre +## Fase 6 – Exportación y Cierre -**Objetivo:** Guardar datos finales de forma segura y limpia. +**Objetivo:** Facilitar el uso de los datos fuera del sistema. -- [ ] **6.1 Google Sheets** - - [ ] Implementar la escritura de datos en Google Sheets. -- [ ] **6.2 Limpieza de Estados Temporales** - - [ ] Asegurar la limpieza de datos temporales tras el procesamiento. +- [ ] **6.1 Exportación a CSV/Excel** + - [x] Implementar exportador básico a CSV. +- [ ] **6.2 Integración con Google Sheets (Opcional)** + - [ ] Sincronización automática de gastos confirmados. --- -## Fase 7 – Hardening y Preparación a Futuro +## Fase 7 – Hardening -**Objetivo:** Fortalecer el sistema y prepararlo para escalar. +**Objetivo:** Estabilidad y producción. -- [ ] **7.1 Logs y Errores** - - [ ] Implementar logs estructurados y un manejo de errores robusto. -- [ ] **7.2 Preparación para Escalar** - - [ ] Diseñar el sistema para soportar múltiples usuarios en el futuro. +- [ ] **7.1 Manejo de Errores Robusto** + - [ ] Reintentos en llamadas a API de IA. + - [ ] Alertas de sistema. +- [ ] **7.2 Logs de Auditoría** + - [ ] Registro detallado de quién cambió qué y cuándo. diff --git a/verify_matcher.py b/verify_matcher.py new file mode 100644 index 0000000..9e0c043 --- /dev/null +++ b/verify_matcher.py @@ -0,0 +1,36 @@ +""" +Verification script for matching logic. +""" +import sys +import os + +# Add project root to path +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from app.preprocessing.matcher import get_metadata_from_match + +def test_match(description: str): + print(f"\nTesting: '{description}'") + metadata = get_metadata_from_match(description) + if metadata: + print(f" Match Found!") + print(f" Type: {metadata.get('match_type')}") + print(f" Name: {metadata.get('matched_name')}") + print(f" Category: {metadata.get('category')}") + print(f" Subcategory: {metadata.get('subcategory')}") + print(f" Expense Type: {metadata.get('expense_type')}") + else: + print(" No match found.") + +if __name__ == "__main__": + # Test providers + test_match("Lunch at Amazon") + test_match("Uber Eats dinner") + test_match("Office Depot supplies") + + # Test keywords + test_match("New monitor for work") + test_match("Croquetas for the dog") + + # Test no match + test_match("Random expense")