Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/configs/constant.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const bridge_ids = {
improve_prompt_optimizer: "68e4ac02739a8b89ba27b22a",
generate_test_cases: "68e8d1fbf8c9ba2043cf7afd",
prompt_checker: "692ee19da04fbf2a132b252c",
rich_ui_template: "6967b36c17a69473fa7fdb90"
rich_ui_template: "6967b36c17a69473fa7fdb90",
canonicalizer: "6973200cf60dd5bf64eeb325"
};

const redis_keys = {
Expand Down
15 changes: 15 additions & 0 deletions src/configs/prompts.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Centralized configuration for AI prompts used across services.
* Keeping prompts in a dedicated config makes them easier to maintain, update, and test.
*/

export const GPT_MEMORY_PROMPT =
"use the function to store the memory if the user message and history is related to the context or is important to store else don't call the function and ignore it. is purpose is not there than think its the begining of the conversation. Only return the exact memory as output no an extra text jusy memory if present or Just return False";

export const CANONICALIZER_SYSTEM_PROMPT =
"You are a question canonicalizer. Given a conversation context, extract and return a canonical question that represents the user's intent.";

export default {
GPT_MEMORY_PROMPT,
CANONICALIZER_SYSTEM_PROMPT
};
9 changes: 9 additions & 0 deletions src/consumers/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import dotenv from "dotenv";
import logger from "../logger.js";
import rabbitmqService from "../services/rabbitmq.service.js";
import { logQueueConsumerConfig } from "./logQueueConsumer.js";

dotenv.config();
const CONSUMER_ENABLED = process.env.CONSUMER_ENABLED?.toLowerCase() === "true";
const LOG_QUEUE_CONSUMER_ENABLED = process.env.LOG_QUEUE_CONSUMER_ENABLED?.toLowerCase() === "true";

// Configure consumers array
const CONSUMERS = [];

// Add log queue consumer if enabled
if (LOG_QUEUE_CONSUMER_ENABLED) {
CONSUMERS.push(logQueueConsumerConfig);
}

class Consumer {
constructor(obj, connectionString) {
console.log("in contructor ");
Expand Down
126 changes: 126 additions & 0 deletions src/consumers/logQueueConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logger from "../logger.js";
import { saveSubThreadIdAndName } from "../services/logQueue/saveSubThreadIdAndName.service.js";
import { validateResponse } from "../services/logQueue/validateResponse.service.js";
import { totalTokenCalculation } from "../services/logQueue/totalTokenCalculation.service.js";
import { chatbotSuggestions } from "../services/logQueue/chatbotSuggestions.service.js";
import { handleGptMemory } from "../services/logQueue/handleGptMemory.service.js";
import { saveToAgentMemory } from "../services/logQueue/saveToAgentMemory.service.js";
import { saveFilesToRedis } from "../services/logQueue/saveFilesToRedis.service.js";
import { sendApiHitEvent } from "../services/logQueue/sendApiHitEvent.service.js";
import { broadcastResponseWebhook } from "../services/logQueue/broadcastResponseWebhook.service.js";
import { updateBatchHistory } from "../services/logQueue/updateBatchHistory.service.js";
import { saveBatchMetrics } from "../services/logQueue/saveBatchMetrics.service.js";

/**
* Process messages from the log queue
* Handles various message types including regular logs, agent memory, and batch operations
* @param {Object} messages - Parsed message object from queue
*/
async function processLogQueueMessage(messages) {
// Validate messages object
if (!messages || typeof messages !== "object") {
logger.warn("processLogQueueMessage: Invalid message format");
return;
}

// Handle save_sub_thread_id_and_name
if (messages["save_sub_thread_id_and_name"]) {
await saveSubThreadIdAndName(messages["save_sub_thread_id_and_name"]);
}

// Skip further processing for image type messages
if (messages.type === "image") {
return;
}

// Handle agent memory (save_to_hippocampus / save_agent_memory)
const agent_memory_data = messages.save_agent_memory || messages.save_to_hippocampus || {};
if (agent_memory_data.chatbot_auto_answers) {
await saveToAgentMemory({
user_question: agent_memory_data.user_message || "",
assistant_answer: agent_memory_data.assistant_message || "",
agent_id: agent_memory_data.bridge_id || "",
bridge_name: agent_memory_data.bridge_name || "",
system_prompt: agent_memory_data.system_prompt || ""
});
}

// Handle sendApiHitEvent (only if not alert_flag)
if (messages["validateResponse"] && !messages["validateResponse"]?.alert_flag) {
await sendApiHitEvent({
message_id: messages["validateResponse"]?.message_id,
org_id: messages["validateResponse"]?.org_id
});
}

// Handle validateResponse
if (messages["validateResponse"]) {
await validateResponse(messages["validateResponse"]);
}

// Handle totalTokenCalculation
if (messages["total_token_calculation"]) {
await totalTokenCalculation(messages["total_token_calculation"]);
}

// Handle handleGptMemory (check condition first)
if (messages["check_handle_gpt_memory"]?.gpt_memory && messages["handle_gpt_memory"]) {
await handleGptMemory(messages["handle_gpt_memory"]);
}

// Handle chatbotSuggestions (check condition first)
if (messages["check_chatbot_suggestions"]?.bridgeType && messages["chatbot_suggestions"]) {
await chatbotSuggestions(messages["chatbot_suggestions"]);
}

// Handle saveFilesToRedis
if (messages["save_files_to_redis"]) {
await saveFilesToRedis(messages["save_files_to_redis"]);
}

// Handle broadcastResponseWebhook
if (messages["broadcast_response_webhook"]) {
await broadcastResponseWebhook(messages["broadcast_response_webhook"]);
}

// Handle batch history updates (from Python batch processing)
if (messages["update_batch_history"]) {
await updateBatchHistory(messages["update_batch_history"]);
}

// Handle batch metrics (from Python batch processing)
if (messages["save_batch_metrics"]) {
await saveBatchMetrics(messages["save_batch_metrics"]);
}
}

/**
* Log queue processor function for Consumer class
* Parses JSON message and processes it
* @param {Object} message - Raw RabbitMQ message
* @param {Object} channel - RabbitMQ channel
*/
async function logQueueProcessor(message, channel) {
let message_data;
try {
message_data = JSON.parse(message.content.toString());
await processLogQueueMessage(message_data);
channel.ack(message);
} catch (err) {
logger.error(`Error processing log queue message: ${err.message}`);
// Reject message without requeue to avoid infinite loops
channel.nack(message, false, false);
}
}

/**
* Log queue consumer configuration
* batchSize increased from 1 to 10 for better throughput
*/
const logQueueConsumerConfig = {
queueName: process.env.LOG_QUEUE_NAME || `AI-MIDDLEWARE-DATA-QUEUE-${process.env.ENVIROMENT || "development"}`,
process: logQueueProcessor,
batchSize: 10 // Increased from 1 for better throughput
};

export { logQueueProcessor, logQueueConsumerConfig, processLogQueueMessage };
18 changes: 18 additions & 0 deletions src/mongoModel/AgentMemory.model.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import mongoose from "mongoose";

const agentMemorySchema = new mongoose.Schema({
resource_id: { type: String, required: true, index: true },
agent_id: { type: String, required: true, index: true },
canonical_question: { type: String, required: true },
original_answer: { type: String, default: null },
frequency: { type: Number, default: 1 },
created_at: { type: Date, default: Date.now },
last_seen: { type: Date, default: Date.now }
});

// Create compound index for efficient lookups
agentMemorySchema.index({ agent_id: 1, resource_id: 1 });

const AgentMemory = mongoose.models.AgentMemory || mongoose.model("AgentMemory", agentMemorySchema, "agent_memories");

export default AgentMemory;
93 changes: 93 additions & 0 deletions src/services/logQueue/broadcastResponseWebhook.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import axios from "axios";
import logger from "../../logger.js";

/**
* Validates the input parameters
* @param {Object} params - The parameters to validate
* @returns {Object} - { isValid: boolean, error?: string }
*/
function validateParams(params) {
if (!params || typeof params !== "object") {
return { isValid: false, error: "Invalid params: expected an object" };
}

const { bridge_id, org_id } = params;

if (!bridge_id || typeof bridge_id !== "string") {
return { isValid: false, error: "Missing or invalid bridge_id" };
}
if (!org_id || typeof org_id !== "string") {
return { isValid: false, error: "Missing or invalid org_id" };
}

return { isValid: true };
}

/**
* Broadcast response to configured webhook
* @param {Object} data - Webhook data
* @param {string} data.bridge_id - Bridge ID
* @param {string} data.org_id - Organization ID
* @param {Object} data.response - AI response
* @param {string} data.user_question - User's question
* @param {Object} data.variables - Variables used
* @param {string} data.error_type - Error type if applicable
* @param {string} data.bridge_name - Bridge name
* @param {boolean} data.is_embed - Whether from embed
* @param {string} data.user_id - User ID
* @param {string} data.thread_id - Thread ID
* @param {string} data.service - Service name
*/
async function broadcastResponseWebhook(data) {
const validation = validateParams(data);
if (!validation.isValid) {
logger.warn(`broadcastResponseWebhook: ${validation.error}`);
return;
}

const {
bridge_id,
org_id,
response,
user_question,
variables,
error_type,
bridge_name,
is_embed,
user_id,
thread_id,
service
} = data;

try {
// TODO: Get webhook URL from configuration
// For now, this is a placeholder for webhook broadcasting
const webhookPayload = {
bridge_id,
org_id,
response,
user_question,
variables: variables || {},
error_type,
bridge_name,
is_embed,
user_id,
thread_id,
service,
timestamp: new Date().toISOString()
};

// TODO: Fetch webhook URL from bridge configuration and send
// const webhookUrl = await getWebhookUrl(bridge_id, org_id);
// if (webhookUrl) {
// await axios.post(webhookUrl, webhookPayload);
// }

logger.debug(`Broadcast webhook prepared for bridge_id=${bridge_id}`);
return webhookPayload;
} catch (err) {
logger.error(`Error in broadcastResponseWebhook: ${err.message}`);
}
}

export { broadcastResponseWebhook };
90 changes: 90 additions & 0 deletions src/services/logQueue/chatbotSuggestions.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { callAiMiddleware } from "../utils/aiCall.utils.js";
import { bridge_ids } from "../../configs/constant.js";
import logger from "../../logger.js";

/**
* Validates the input parameters
* @param {Object} params - The parameters to validate
* @returns {Object} - { isValid: boolean, error?: string }
*/
function validateParams(params) {
if (!params || typeof params !== "object") {
return { isValid: false, error: "Invalid params: expected an object" };
}

const { thread_id, sub_thread_id, org_id } = params;

if (!thread_id || typeof thread_id !== "string") {
return { isValid: false, error: "Missing or invalid thread_id" };
}
if (!sub_thread_id || typeof sub_thread_id !== "string") {
return { isValid: false, error: "Missing or invalid sub_thread_id" };
}
if (!org_id || typeof org_id !== "string") {
return { isValid: false, error: "Missing or invalid org_id" };
}

return { isValid: true };
}

/**
* Generate chatbot suggestions based on conversation context
* @param {Object} data - Suggestion generation data
* @param {Object} data.response_format - Response format configuration
* @param {Object} data.assistant - Assistant response data
* @param {string} data.user - User message
* @param {string} data.bridge_summary - Bridge summary
* @param {string} data.thread_id - Thread ID
* @param {string} data.sub_thread_id - Sub-thread ID
* @param {Object} data.configuration - Configuration object
* @param {string} data.org_id - Organization ID
*/
async function chatbotSuggestions(data) {
const validation = validateParams(data);
if (!validation.isValid) {
logger.warn(`chatbotSuggestions: ${validation.error}`);
return;
}

const { response_format, assistant, user, bridge_summary, thread_id, sub_thread_id, configuration, org_id } = data;

try {
// Skip if no response format or not configured for suggestions
if (!response_format?.type || response_format.type !== "chatbot") {
return;
}

const assistantContent = assistant?.data?.content || "";
if (!assistantContent || !user) {
return;
}

// Build variables for suggestion generation
const variables = {
user_message: user,
assistant_response: assistantContent,
bridge_summary: bridge_summary || "",
thread_id,
sub_thread_id
};

// Call AI to generate suggestions
const suggestions = await callAiMiddleware(
"Generate follow-up suggestions based on the conversation",
bridge_ids.chatbot_suggestions,
variables,
configuration,
"text"
);

if (suggestions) {
logger.debug(`Generated suggestions for thread_id=${thread_id}`);
}

return suggestions;
} catch (err) {
logger.error(`Error in chatbotSuggestions: ${err.message}`);
}
}

export { chatbotSuggestions };
Loading