mirror of
https://github.com/marcogll/builderbot-openai-assistants.git
synced 2026-01-13 13:25:18 +00:00
Merge pull request #6 from KevinDog24/master
Added a queue to handle multiple messages and not crash the run
This commit is contained in:
5562
pnpm-lock.yaml
generated
5562
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
82
src/app.ts
82
src/app.ts
@@ -9,6 +9,50 @@ import { typing } from "./utils/presence"
|
|||||||
const PORT = process.env.PORT ?? 3008
|
const PORT = process.env.PORT ?? 3008
|
||||||
/** ID del asistente de OpenAI */
|
/** ID del asistente de OpenAI */
|
||||||
const ASSISTANT_ID = process.env.ASSISTANT_ID ?? ''
|
const ASSISTANT_ID = process.env.ASSISTANT_ID ?? ''
|
||||||
|
const userQueues = new Map();
|
||||||
|
const userLocks = new Map(); // New lock mechanism
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to process the user's message by sending it to the OpenAI API
|
||||||
|
* and sending the response back to the user.
|
||||||
|
*/
|
||||||
|
const processUserMessage = async (ctx, { flowDynamic, state, provider }) => {
|
||||||
|
await typing(ctx, provider);
|
||||||
|
const response = await toAsk(ASSISTANT_ID, ctx.body, state);
|
||||||
|
|
||||||
|
// Split the response into chunks and send them sequentially
|
||||||
|
const chunks = response.split(/\n\n+/);
|
||||||
|
for (const chunk of chunks) {
|
||||||
|
const cleanedChunk = chunk.trim().replace(/【.*?】[ ] /g, "");
|
||||||
|
await flowDynamic([{ body: cleanedChunk }]);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to handle the queue for each user.
|
||||||
|
*/
|
||||||
|
const handleQueue = async (userId) => {
|
||||||
|
const queue = userQueues.get(userId);
|
||||||
|
|
||||||
|
if (userLocks.get(userId)) {
|
||||||
|
return; // If locked, skip processing
|
||||||
|
}
|
||||||
|
|
||||||
|
while (queue.length > 0) {
|
||||||
|
userLocks.set(userId, true); // Lock the queue
|
||||||
|
const { ctx, flowDynamic, state, provider } = queue.shift();
|
||||||
|
try {
|
||||||
|
await processUserMessage(ctx, { flowDynamic, state, provider });
|
||||||
|
} catch (error) {
|
||||||
|
console.error(Error processing message for user ${userId}:, error);
|
||||||
|
} finally {
|
||||||
|
userLocks.set(userId, false); // Release the lock
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
userLocks.delete(userId); // Remove the lock once all messages are processed
|
||||||
|
userQueues.delete(userId); // Remove the queue once all messages are processed
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flujo de bienvenida que maneja las respuestas del asistente de IA
|
* Flujo de bienvenida que maneja las respuestas del asistente de IA
|
||||||
@@ -16,16 +60,20 @@ const ASSISTANT_ID = process.env.ASSISTANT_ID ?? ''
|
|||||||
*/
|
*/
|
||||||
const welcomeFlow = addKeyword<BaileysProvider, MemoryDB>(EVENTS.WELCOME)
|
const welcomeFlow = addKeyword<BaileysProvider, MemoryDB>(EVENTS.WELCOME)
|
||||||
.addAction(async (ctx, { flowDynamic, state, provider }) => {
|
.addAction(async (ctx, { flowDynamic, state, provider }) => {
|
||||||
await typing(ctx, provider)
|
const userId = ctx.from; // Use the user's ID to create a unique queue for each user
|
||||||
const response = await toAsk(ASSISTANT_ID, ctx.body, state)
|
|
||||||
|
|
||||||
// Dividir la respuesta en chunks y enviarlos secuencialmente
|
if (!userQueues.has(userId)) {
|
||||||
const chunks = response.split(/\n\n+/)
|
userQueues.set(userId, []);
|
||||||
for (const chunk of chunks) {
|
|
||||||
const cleanedChunk = chunk.trim().replace(/【.*?】/g, "");
|
|
||||||
await flowDynamic([{ body: cleanedChunk }])
|
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
const queue = userQueues.get(userId);
|
||||||
|
queue.push({ ctx, flowDynamic, state, provider });
|
||||||
|
|
||||||
|
// If this is the only message in the queue, process it immediately
|
||||||
|
if (!userLocks.get(userId) && queue.length === 1) {
|
||||||
|
await handleQueue(userId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Función principal que configura y inicia el bot
|
* Función principal que configura y inicia el bot
|
||||||
@@ -37,7 +85,7 @@ const main = async () => {
|
|||||||
* Flujo del bot
|
* Flujo del bot
|
||||||
* @type {import('@builderbot/bot').Flow<BaileysProvider, MemoryDB>}
|
* @type {import('@builderbot/bot').Flow<BaileysProvider, MemoryDB>}
|
||||||
*/
|
*/
|
||||||
const adapterFlow = createFlow([welcomeFlow])
|
const adapterFlow = createFlow([welcomeFlow]);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Proveedor de servicios de mensajería
|
* Proveedor de servicios de mensajería
|
||||||
@@ -45,14 +93,14 @@ const main = async () => {
|
|||||||
*/
|
*/
|
||||||
const adapterProvider = createProvider(BaileysProvider, {
|
const adapterProvider = createProvider(BaileysProvider, {
|
||||||
groupsIgnore: true,
|
groupsIgnore: true,
|
||||||
readStatus: false
|
readStatus: false,
|
||||||
})
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base de datos en memoria para el bot
|
* Base de datos en memoria para el bot
|
||||||
* @type {MemoryDB}
|
* @type {MemoryDB}
|
||||||
*/
|
*/
|
||||||
const adapterDB = new MemoryDB()
|
const adapterDB = new MemoryDB();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuración y creación del bot
|
* Configuración y creación del bot
|
||||||
@@ -62,10 +110,10 @@ const main = async () => {
|
|||||||
flow: adapterFlow,
|
flow: adapterFlow,
|
||||||
provider: adapterProvider,
|
provider: adapterProvider,
|
||||||
database: adapterDB,
|
database: adapterDB,
|
||||||
})
|
});
|
||||||
|
|
||||||
httpInject(adapterProvider.server)
|
httpInject(adapterProvider.server);
|
||||||
httpServer(+PORT)
|
httpServer(+PORT);
|
||||||
}
|
};
|
||||||
|
|
||||||
main()
|
main();
|
||||||
Reference in New Issue
Block a user