diff --git a/src/app.ts b/src/app.ts index 299e2c8..a1ec400 100644 --- a/src/app.ts +++ b/src/app.ts @@ -9,6 +9,50 @@ import { typing } from "./utils/presence" const PORT = process.env.PORT ?? 3008 /** ID del asistente de OpenAI */ const ASSISTANT_ID = process.env.ASSISTANT_ID ?? '' +const userQueues = new Map(); +const userLocks = new Map(); // New lock mechanism + +/** + * Function to process the user's message by sending it to the OpenAI API + * and sending the response back to the user. + */ +const processUserMessage = async (ctx, { flowDynamic, state, provider }) => { + await typing(ctx, provider); + const response = await toAsk(ASSISTANT_ID, ctx.body, state); + + // Split the response into chunks and send them sequentially + const chunks = response.split(/\n\n+/); + for (const chunk of chunks) { + const cleanedChunk = chunk.trim().replace(/【.*?】[ ] /g, ""); + await flowDynamic([{ body: cleanedChunk }]); + } +}; + +/** + * Function to handle the queue for each user. + */ +const handleQueue = async (userId) => { + const queue = userQueues.get(userId); + + if (userLocks.get(userId)) { + return; // If locked, skip processing + } + + while (queue.length > 0) { + userLocks.set(userId, true); // Lock the queue + const { ctx, flowDynamic, state, provider } = queue.shift(); + try { + await processUserMessage(ctx, { flowDynamic, state, provider }); + } catch (error) { + console.error(Error processing message for user ${userId}:, error); + } finally { + userLocks.set(userId, false); // Release the lock + } + } + + userLocks.delete(userId); // Remove the lock once all messages are processed + userQueues.delete(userId); // Remove the queue once all messages are processed +}; /** * Flujo de bienvenida que maneja las respuestas del asistente de IA @@ -16,16 +60,20 @@ const ASSISTANT_ID = process.env.ASSISTANT_ID ?? '' */ const welcomeFlow = addKeyword(EVENTS.WELCOME) .addAction(async (ctx, { flowDynamic, state, provider }) => { - await typing(ctx, provider) - const response = await toAsk(ASSISTANT_ID, ctx.body, state) + const userId = ctx.from; // Use the user's ID to create a unique queue for each user - // Dividir la respuesta en chunks y enviarlos secuencialmente - const chunks = response.split(/\n\n+/) - for (const chunk of chunks) { - const cleanedChunk = chunk.trim().replace(/【.*?】/g, ""); - await flowDynamic([{ body: cleanedChunk }]) + if (!userQueues.has(userId)) { + userQueues.set(userId, []); } - }) + + const queue = userQueues.get(userId); + queue.push({ ctx, flowDynamic, state, provider }); + + // If this is the only message in the queue, process it immediately + if (!userLocks.get(userId) && queue.length === 1) { + await handleQueue(userId); + } + }); /** * Función principal que configura y inicia el bot @@ -37,7 +85,7 @@ const main = async () => { * Flujo del bot * @type {import('@builderbot/bot').Flow} */ - const adapterFlow = createFlow([welcomeFlow]) + const adapterFlow = createFlow([welcomeFlow]); /** * Proveedor de servicios de mensajería @@ -45,14 +93,14 @@ const main = async () => { */ const adapterProvider = createProvider(BaileysProvider, { groupsIgnore: true, - readStatus: false - }) + readStatus: false, + }); /** * Base de datos en memoria para el bot * @type {MemoryDB} */ - const adapterDB = new MemoryDB() + const adapterDB = new MemoryDB(); /** * Configuración y creación del bot @@ -62,10 +110,10 @@ const main = async () => { flow: adapterFlow, provider: adapterProvider, database: adapterDB, - }) + }); - httpInject(adapterProvider.server) - httpServer(+PORT) -} + httpInject(adapterProvider.server); + httpServer(+PORT); +}; -main() \ No newline at end of file +main(); \ No newline at end of file