Added a queue to handle multiple messages

This commit is contained in:
KevinDog24
2024-08-25 19:01:45 +00:00
parent 37848d37dd
commit c5ceaa341b

View File

@@ -9,6 +9,50 @@ import { typing } from "./utils/presence"
const PORT = process.env.PORT ?? 3008
/** ID del asistente de OpenAI */
const ASSISTANT_ID = process.env.ASSISTANT_ID ?? ''
const userQueues = new Map();
const userLocks = new Map(); // New lock mechanism
/**
* Function to process the user's message by sending it to the OpenAI API
* and sending the response back to the user.
*/
const processUserMessage = async (ctx, { flowDynamic, state, provider }) => {
await typing(ctx, provider);
const response = await toAsk(ASSISTANT_ID, ctx.body, state);
// Split the response into chunks and send them sequentially
const chunks = response.split(/\n\n+/);
for (const chunk of chunks) {
const cleanedChunk = chunk.trim().replace(/【.*?】[ ] /g, "");
await flowDynamic([{ body: cleanedChunk }]);
}
};
/**
* Function to handle the queue for each user.
*/
const handleQueue = async (userId) => {
const queue = userQueues.get(userId);
if (userLocks.get(userId)) {
return; // If locked, skip processing
}
while (queue.length > 0) {
userLocks.set(userId, true); // Lock the queue
const { ctx, flowDynamic, state, provider } = queue.shift();
try {
await processUserMessage(ctx, { flowDynamic, state, provider });
} catch (error) {
console.error(Error processing message for user ${userId}:, error);
} finally {
userLocks.set(userId, false); // Release the lock
}
}
userLocks.delete(userId); // Remove the lock once all messages are processed
userQueues.delete(userId); // Remove the queue once all messages are processed
};
/**
* Flujo de bienvenida que maneja las respuestas del asistente de IA
@@ -16,16 +60,20 @@ const ASSISTANT_ID = process.env.ASSISTANT_ID ?? ''
*/
const welcomeFlow = addKeyword<BaileysProvider, MemoryDB>(EVENTS.WELCOME)
.addAction(async (ctx, { flowDynamic, state, provider }) => {
await typing(ctx, provider)
const response = await toAsk(ASSISTANT_ID, ctx.body, state)
const userId = ctx.from; // Use the user's ID to create a unique queue for each user
// Dividir la respuesta en chunks y enviarlos secuencialmente
const chunks = response.split(/\n\n+/)
for (const chunk of chunks) {
const cleanedChunk = chunk.trim().replace(/【.*?】/g, "");
await flowDynamic([{ body: cleanedChunk }])
if (!userQueues.has(userId)) {
userQueues.set(userId, []);
}
})
const queue = userQueues.get(userId);
queue.push({ ctx, flowDynamic, state, provider });
// If this is the only message in the queue, process it immediately
if (!userLocks.get(userId) && queue.length === 1) {
await handleQueue(userId);
}
});
/**
* Función principal que configura y inicia el bot
@@ -37,7 +85,7 @@ const main = async () => {
* Flujo del bot
* @type {import('@builderbot/bot').Flow<BaileysProvider, MemoryDB>}
*/
const adapterFlow = createFlow([welcomeFlow])
const adapterFlow = createFlow([welcomeFlow]);
/**
* Proveedor de servicios de mensajería
@@ -45,14 +93,14 @@ const main = async () => {
*/
const adapterProvider = createProvider(BaileysProvider, {
groupsIgnore: true,
readStatus: false
})
readStatus: false,
});
/**
* Base de datos en memoria para el bot
* @type {MemoryDB}
*/
const adapterDB = new MemoryDB()
const adapterDB = new MemoryDB();
/**
* Configuración y creación del bot
@@ -62,10 +110,10 @@ const main = async () => {
flow: adapterFlow,
provider: adapterProvider,
database: adapterDB,
})
});
httpInject(adapterProvider.server)
httpServer(+PORT)
}
httpInject(adapterProvider.server);
httpServer(+PORT);
};
main()
main();