diff --git a/rag_service/core/assignment_context_manager.py b/rag_service/core/assignment_context_manager.py index ddb8bf9..b42def2 100644 --- a/rag_service/core/assignment_context_manager.py +++ b/rag_service/core/assignment_context_manager.py @@ -6,6 +6,8 @@ from datetime import datetime, timedelta from typing import Any, Dict from frappe.utils import now_datetime +import time +from ..monitoring import record_tap_lms_api_call class AssignmentContextManager: def __init__(self): @@ -117,13 +119,34 @@ async def _fetch_assignment_from_api(self, assignment_id: str) -> Dict: payload = { "assignment_id": assignment_id } - response = requests.post( - api_url, - headers=self.headers, - json=payload, - timeout=30 + _api_t0 = time.monotonic() + _api_status = None + try: + response = requests.post( + api_url, + headers=self.headers, + json=payload, + timeout=30 + ) + _api_status = response.status_code + except Exception as _conn_err: + record_tap_lms_api_call( + submission_id=None, + endpoint="get_assignment_context", + duration_ms=(time.monotonic() - _api_t0) * 1000, + cache_hit=False, + status_code=None, + ) + raise + + record_tap_lms_api_call( + submission_id=None, + endpoint="get_assignment_context", + duration_ms=(time.monotonic() - _api_t0) * 1000, + cache_hit=False, + status_code=_api_status, ) - + if response.status_code != 200: error_msg = f"API request failed with status {response.status_code}: {response.text}" print(f"Error: {error_msg}") @@ -152,13 +175,34 @@ async def _fetch_student_from_api(self, student_id: str) -> Dict: payload = { "student_id": student_id } - response = requests.post( - api_url, - headers=self.headers, - json=payload, - timeout=30 + _api_t0 = time.monotonic() + _api_status = None + try: + response = requests.post( + api_url, + headers=self.headers, + json=payload, + timeout=30 + ) + _api_status = response.status_code + except Exception as _conn_err: + record_tap_lms_api_call( + submission_id=None, + endpoint="get_student_context", + duration_ms=(time.monotonic() - _api_t0) * 1000, + cache_hit=False, + status_code=None, + ) + raise + + record_tap_lms_api_call( + submission_id=None, + endpoint="get_student_context", + duration_ms=(time.monotonic() - _api_t0) * 1000, + cache_hit=False, + status_code=_api_status, ) - + if response.status_code != 200: error_msg = f"API request failed with status {response.status_code}: {response.text}" print(f"Error: {error_msg}") diff --git a/rag_service/core/feedback_handler.py b/rag_service/core/feedback_handler.py index 6833eb1..6cbc27a 100644 --- a/rag_service/core/feedback_handler.py +++ b/rag_service/core/feedback_handler.py @@ -1,6 +1,12 @@ # rag_service/rag_service/core/feedback_handler.py +import time import frappe +from ..monitoring import ( + record_rag_submission_received, + record_rag_feedback_complete, + record_rag_feedback_failed, +) import json from datetime import datetime from typing import Dict, Optional @@ -18,6 +24,18 @@ def __init__(self): async def handle_submission(self, message_data: Dict) -> None: """Handle a new submission from plagiarism queue""" request_id = None + _t0 = time.monotonic() + submission_id = message_data.get("submission_id") + student_id = message_data.get("student_id") + assignment_id = message_data.get("assignment_id") + + # SRE: pipeline trace — step 1 in rag_service + record_rag_submission_received( + submission_id=submission_id, + student_id=student_id, + assignment_id=assignment_id, + ) + try: submission_data = normalize_submission_payload(message_data) @@ -47,6 +65,15 @@ async def handle_submission(self, message_data: Dict) -> None: error_msg = f"Error handling submission: {str(e)}" print(f"\nError: {error_msg}") frappe.log_error(error_msg, "Submission Handler Error") + # SRE: pipeline trace — failure + try: + record_rag_feedback_failed( + submission_id=submission_id, + error=error_msg, + duration_ms=(time.monotonic() - _t0) * 1000, + ) + except Exception: + pass # Mark request as failed if it exists if request_id and frappe.db.exists("Feedback Request", request_id): @@ -62,6 +89,16 @@ async def handle_submission(self, message_data: Dict) -> None: # Process and deliver feedback await self.feedback_service.process_feedback(request_id, feedback, model_used, template_used) print("\nFeedback processing completed") + # SRE: pipeline trace — complete (always emitted, success or error) + try: + record_rag_feedback_complete( + submission_id=submission_id, + model_used=model_used, + template_used=template_used, + duration_ms=(time.monotonic() - _t0) * 1000, + ) + except Exception: + pass def _attach_plagiarism_defaults(self, feedback: Dict) -> Dict: feedback["plagiarism_output"] = { diff --git a/rag_service/core/feedback_service.py b/rag_service/core/feedback_service.py index 2ee6aae..5a4485c 100644 --- a/rag_service/core/feedback_service.py +++ b/rag_service/core/feedback_service.py @@ -6,6 +6,8 @@ from typing import Dict from ..feedback_utils.evaluation_generation import EvaluationGenerator from ..utils.queue_manager import QueueManager +import time +from ..monitoring import record_llm_call class FeedbackService: def __init__(self): @@ -47,9 +49,28 @@ async def generate_feedback( self, assignment_context: Dict, submission_data: Di # Continue with normal feedback generation for original work else: result_status = "Success - Original" - feedback, model_used, tempalate_used = await self.evaluation_generator.generate_ai_feedback( - assignment_context, submission_data, submission_id - ) + _llm_t0 = time.monotonic() + try: + feedback, model_used, tempalate_used = await self.evaluation_generator.generate_ai_feedback( + assignment_context, submission_data, submission_id + ) + record_llm_call( + submission_id=submission_id, + provider=model_used, + model=model_used, + status="success", + duration_ms=(time.monotonic() - _llm_t0) * 1000, + ) + except Exception as _llm_err: + record_llm_call( + submission_id=submission_id, + provider="unknown", + model="unknown", + status="error", + duration_ms=(time.monotonic() - _llm_t0) * 1000, + error=str(_llm_err), + ) + raise feedback["translation_language"] = assignment_context["student"].get("language", "English") await self._update_result_status(feedback_request_id, result_status) diff --git a/rag_service/core/llm_providers.py b/rag_service/core/llm_providers.py index a9573da..7f3d61c 100644 --- a/rag_service/core/llm_providers.py +++ b/rag_service/core/llm_providers.py @@ -1,27 +1,123 @@ # rag_service/rag_service/core/llm_providers.py -from together import Together +import asyncio import json -from typing import Any, List, Dict, Optional -from langchain_openai import ChatOpenAI -from langchain.schema import HumanMessage, SystemMessage -from .llm_interface import BaseLLMInterface +from typing import Any, Dict, List, Optional + +import aiohttp import vertexai from google.oauth2 import service_account +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI +from together import Together from vertexai.generative_models import GenerativeModel, Part +from .llm_interface import BaseLLMInterface + + +class StubResponse: + """Mock Gemini response object for the StubProvider""" + + def __init__(self, text, usage_metadata=None): + self.text = text + self._usage_metadata = usage_metadata or { + "candidates_token_count": 100, + "total_token_count": 500, + } + + def to_dict(self): + return { + "candidates": [{"avg_logprobs": -0.5}], + "usage_metadata": self._usage_metadata, + "model_version": "stub-model", + } + + +class StubProvider(BaseLLMInterface): + """Stub provider for local testing without real LLM costs""" + + def __init__( + self, + api_key: str, + model_name: str, + temperature: float = 0.7, + max_tokens: int = 2000, + settings: Any = None, + ): + super().__init__(api_key, model_name, temperature, max_tokens) + self.base_url = getattr(settings, "base_url", "http://llm-stub:8001") + + async def _simulate_latency(self): + await asyncio.sleep(1.5) + + async def _call_stub(self, prompt: str) -> StubResponse: + await self._simulate_latency() + + url = f"{self.base_url.rstrip('/')}/v1/chat/completions" + payload = { + "model": self.model_name, + "messages": [{"role": "user", "content": prompt}], + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, timeout=10) as resp: + if resp.status == 200: + data = await resp.json() + content = data["choices"][0]["message"]["content"] + return StubResponse(content) + else: + error_text = await resp.text() + print(f"Stub Error ({resp.status}): {error_text}") + except Exception as e: + print(f"Connection to LLM stub failed: {e}") + + return StubResponse( + '{"overall_feedback": "Stub error fallback", "final_grade": 0, "rubric_evaluations": []}' + ) + + async def generate(self, messages: List[Dict]) -> Any: + prompt = str(messages) + return await self._call_stub(prompt) + + async def generate_with_vision( + self, image_source, prompt: str, mime_type: Optional[str] = None + ) -> Any: + return await self._call_stub(prompt) + + async def generate_with_video( + self, video_source, prompt: str, mime_type: Optional[str] = None + ) -> Any: + return await self._call_stub(prompt) + + async def generate_with_audio( + self, audio_source, prompt: str, mime_type: Optional[str] = None + ) -> Any: + return await self._call_stub(prompt) + + def calculate_cost(self, response_dict: Dict) -> float: + return 0.00123 + + class OpenAIProvider(BaseLLMInterface): """OpenAI provider using LangChain""" - - def __init__(self, api_key: str, model_name: str, temperature: float = 0.7, max_tokens: int = 2000, settings: Any = None): + + def __init__( + self, + api_key: str, + model_name: str, + temperature: float = 0.7, + max_tokens: int = 2000, + settings: Any = None, + ): super().__init__(api_key, model_name, temperature, max_tokens) self.llm = ChatOpenAI( model_name=model_name, openai_api_key=api_key, temperature=temperature, - max_tokens=max_tokens + max_tokens=max_tokens, ) - + async def generate(self, messages: List[Dict]) -> str: langchain_messages = [] for msg in messages: @@ -29,10 +125,10 @@ async def generate(self, messages: List[Dict]) -> str: langchain_messages.append(SystemMessage(content=msg["content"])) else: langchain_messages.append(HumanMessage(content=msg["content"])) - + response = await self.llm.agenerate([langchain_messages]) return response.generations[0][0].text.strip() - + async def generate_with_vision(self, messages: List[Dict]) -> str: # OpenAI vision models handle image URLs in the content return await self.generate(messages) @@ -40,8 +136,15 @@ async def generate_with_vision(self, messages: List[Dict]) -> str: class TogetherAIProvider(BaseLLMInterface): """Together AI provider optimized for Llama 3.2 90B Vision""" - - def __init__(self, api_key: str, model_name: str, temperature: float = 0.7, max_tokens: int = 15000, settings: Any = None): + + def __init__( + self, + api_key: str, + model_name: str, + temperature: float = 0.7, + max_tokens: int = 15000, + settings: Any = None, + ): super().__init__(api_key, model_name, temperature, max_tokens) self.client = Together(api_key=api_key) @@ -49,70 +152,66 @@ async def generate(self, system_prompt: str, user_prompt: str) -> str: try: messages = self.format_messages(system_prompt, user_prompt) response = await self.client.chat.completions.create( - model=self.model_name, - messages=messages, - temperature=self.temperature + model=self.model_name, messages=messages, temperature=self.temperature ) return response.text except Exception as e: print(f"Error during Together AI generation: {e}") raise Exception(f"Error during Together AI generation: {e}") - async def generate_with_vision(self, image_url: str, system_prompt: str, user_prompt: str = "") -> str: + async def generate_with_vision( + self, image_url: str, system_prompt: str, user_prompt: str = "" + ) -> str: # Llama 3.2 90B Vision handles image URLs in the message content try: messages = self.format_messages(system_prompt, user_prompt, image_url) # print(f"\nFormatted messages for Together AI:\n{json.dumps(messages, indent=2)}") response = self.client.chat.completions.create( - # reasoning={"enabled": False}, - reasoning_effort="low", - model=self.model_name, - messages=messages, - temperature=self.temperature, - max_tokens=self.max_tokens, - ) + # reasoning={"enabled": False}, + reasoning_effort="low", + model=self.model_name, + messages=messages, + temperature=self.temperature, + max_tokens=self.max_tokens, + ) return response except Exception as e: print(f"Error during Together AI vision generation: {e}") raise Exception(f"Error during Together AI vision generation: {e}") - - def format_messages(self, system_prompt: str, user_prompt: str, image_url: Optional[str] = None) -> List[Dict]: + + def format_messages( + self, system_prompt: str, user_prompt: str, image_url: Optional[str] = None + ) -> List[Dict]: """Format messages specifically for Llama 3.2 90B Vision""" messages = [] - + # Llama 3.2 90B sometimes performs better with system prompts in user messages combined_prompt = f"{system_prompt}\n\n{user_prompt}" - + if image_url: # Llama 3.2 90B Vision expects this specific format - messages.append({ - "role": "user", - "content": [ - { - "type": "text", - "text": combined_prompt - }, - { - "type": "image_url", - "image_url": { - "url": image_url - } - } - ] - }) + messages.append( + { + "role": "user", + "content": [ + {"type": "text", "text": combined_prompt}, + {"type": "image_url", "image_url": {"url": image_url}}, + ], + } + ) else: messages = [ {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} + {"role": "user", "content": user_prompt}, ] - + return messages - + def calculate_cost(self, response): """ Calculates the cost of a Together AI API call based on usage_metadata and input/output token costs. Supports Together AI models. - + :param response: The ChatCompletion response object. :param input_cost: Price per 1M input tokens. :param output_cost: Price per 1M output tokens. @@ -151,7 +250,7 @@ def __init__( settings: Any = None, ): super().__init__(api_key, model_name, temperature, max_tokens) - + self.key_data = self._resolve_service_account_credentials(settings) self.location = settings.location self.project_id = settings.project_id @@ -195,11 +294,17 @@ def _ensure_vertex_init(self) -> None: ): return - credentials = service_account.Credentials.from_service_account_info(self.key_data) + credentials = service_account.Credentials.from_service_account_info( + self.key_data + ) if not resolved_project_id: - raise ValueError("Project ID is required for Gemini provider initialization") + raise ValueError( + "Project ID is required for Gemini provider initialization" + ) - vertexai.init(project=resolved_project_id, location=self.location, credentials=credentials) + vertexai.init( + project=resolved_project_id, location=self.location, credentials=credentials + ) GeminiProvider._vertex_init = { "key_data_id": key_data_id, "project_id": resolved_project_id, @@ -230,18 +335,31 @@ async def generate(self, messages: List[Dict]) -> str: cost = self.calculate_cost(response.to_dict()) return response.text, cost or "", 0.0 - def _build_media_part(self, media_source, mime_type: Optional[str] = None, default_kind: str = "application") -> Part: + def _build_media_part( + self, + media_source, + mime_type: Optional[str] = None, + default_kind: str = "application", + ) -> Part: if isinstance(media_source, dict): media_bytes = media_source.get("content") resolved_mime_type = mime_type or media_source.get("mime_type") if media_bytes is not None: - return Part.from_data(data=media_bytes, mime_type=resolved_mime_type or f"{default_kind}/octet-stream") + return Part.from_data( + data=media_bytes, + mime_type=resolved_mime_type or f"{default_kind}/octet-stream", + ) media_url = media_source.get("submission_url") or media_source.get("url") resolved_mime_type = resolved_mime_type or self._infer_mime_type(media_url) - return Part.from_uri(uri=self._normalize_media_uri(media_url), mime_type=resolved_mime_type) + return Part.from_uri( + uri=self._normalize_media_uri(media_url), mime_type=resolved_mime_type + ) if isinstance(media_source, (bytes, bytearray)): - return Part.from_data(data=bytes(media_source), mime_type=mime_type or f"{default_kind}/octet-stream") + return Part.from_data( + data=bytes(media_source), + mime_type=mime_type or f"{default_kind}/octet-stream", + ) media_url = str(media_source) return Part.from_uri( @@ -249,8 +367,16 @@ def _build_media_part(self, media_source, mime_type: Optional[str] = None, defau mime_type=mime_type or self._infer_mime_type(media_url), ) - async def _generate_with_media(self, media_source, prompt: str, mime_type: Optional[str] = None, default_kind: str = "application") -> str: - media_part = self._build_media_part(media_source, mime_type=mime_type, default_kind=default_kind) + async def _generate_with_media( + self, + media_source, + prompt: str, + mime_type: Optional[str] = None, + default_kind: str = "application", + ) -> str: + media_part = self._build_media_part( + media_source, mime_type=mime_type, default_kind=default_kind + ) model = GenerativeModel(self.model_name) response = model.generate_content( [media_part, prompt], @@ -261,7 +387,9 @@ async def _generate_with_media(self, media_source, prompt: str, mime_type: Optio ) return response - async def generate_with_vision(self, image_source, prompt: str, mime_type: Optional[str] = None) -> str: + async def generate_with_vision( + self, image_source, prompt: str, mime_type: Optional[str] = None + ) -> str: try: return await self._generate_with_media( image_source, @@ -272,8 +400,9 @@ async def generate_with_vision(self, image_source, prompt: str, mime_type: Optio except Exception as e: raise Exception(f"Error during vision generation: {e}") - - async def generate_with_video(self, video_source, prompt: str, mime_type: Optional[str] = None) -> str: + async def generate_with_video( + self, video_source, prompt: str, mime_type: Optional[str] = None + ) -> str: try: return await self._generate_with_media( video_source, @@ -285,7 +414,9 @@ async def generate_with_video(self, video_source, prompt: str, mime_type: Option print(f"Error during video generation: {e}") raise Exception(f"Error during video generation: {e}") - async def generate_with_audio(self, audio_source, prompt: str, mime_type: Optional[str] = None) -> str: + async def generate_with_audio( + self, audio_source, prompt: str, mime_type: Optional[str] = None + ) -> str: try: return await self._generate_with_media( audio_source, @@ -296,7 +427,7 @@ async def generate_with_audio(self, audio_source, prompt: str, mime_type: Option except Exception as e: print(f"Error during audio generation: {e}") raise Exception(f"Error during audio generation: {e}") - + def _infer_mime_type(self, url: str) -> str: if url.endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif")): return f"image/{url.split('.')[-1]}" @@ -323,26 +454,29 @@ def calculate_cost(self, response): """ metadata = response.get("usage_metadata") model = response.get("model_version", "gemini-2.5-pro") - - + # Token counts # For images/multimodal, 'total_token_count' includes the media tokens output_tokens = metadata.get("candidates_token_count", 0) input_tokens = metadata.get("total_token_count", 0) - output_tokens - + # Pricing per 1 Million Tokens (as of March 2026) pricing = { "gemini-2.5-pro": { - "input_std": 1.25, "output_std": 10.00, - "input_long": 2.50, "output_long": 15.00 + "input_std": 1.25, + "output_std": 10.00, + "input_long": 2.50, + "output_long": 15.00, }, "gemini-3.1-pro": { - "input_std": 2.00, "output_std": 12.00, - "input_long": 4.00, "output_long": 18.00 + "input_std": 2.00, + "output_std": 12.00, + "input_long": 4.00, + "output_long": 18.00, }, "gemini-2.5-flash": {"input": 0.30, "output": 2.50}, "gemini-2.5-flash-lite": {"input": 0.10, "output": 0.40}, - "gemini-3-flash": {"input": 0.50, "output": 3.00} + "gemini-3-flash": {"input": 0.50, "output": 3.00}, } # Determine rate based on model and context length @@ -357,14 +491,17 @@ def calculate_cost(self, response): out_rate = pricing[base_model]["output_long"] else: # Flash models usually have flat pricing - rates = pricing.get(model, pricing["gemini-2.5-flash"]) # Default to Flash + rates = pricing.get(model, pricing["gemini-2.5-flash"]) # Default to Flash in_rate = rates["input"] out_rate = rates["output"] # Calculate final cost - cost = (input_tokens * (in_rate / 1_000_000)) + (output_tokens * (out_rate / 1_000_000)) + cost = (input_tokens * (in_rate / 1_000_000)) + ( + output_tokens * (out_rate / 1_000_000) + ) return round(cost, 6) + def create_llm_provider( provider: str, api_key: str, @@ -374,14 +511,15 @@ def create_llm_provider( **kwargs, ) -> BaseLLMInterface: """Factory function to create LLM provider instances""" - + providers = { "OpenAI": OpenAIProvider, "Together AI": TogetherAIProvider, "Gemini": GeminiProvider, + "Stub": StubProvider, } - + if provider not in providers: raise ValueError(f"Unsupported provider: {provider}") - + return providers[provider](api_key, model_name, temperature, max_tokens, **kwargs) diff --git a/rag_service/feedback_utils/audio_evaluation.py b/rag_service/feedback_utils/audio_evaluation.py index 33ff9ba..fe70e3f 100644 --- a/rag_service/feedback_utils/audio_evaluation.py +++ b/rag_service/feedback_utils/audio_evaluation.py @@ -1,11 +1,12 @@ # rag_service/rag_service/feedback_utils/audio_evaluation.py +import os from typing import Dict, Tuple import frappe -from .evaluation_generation import EvaluationGenerator from ..utils.gcp_service_client import GCPServiceClient +from .evaluation_generation import EvaluationGenerator class AudioEvaluationGenerator(EvaluationGenerator): @@ -26,7 +27,9 @@ async def generate_feedback( activity_type = assignment_context["assignment"].get("activity_type") course_vertical = assignment_context["assignment"].get("course_vertical") - template = self.get_prompt_template("audio", "both", activity_type, course_vertical) + template = self.get_prompt_template( + "audio", "both", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( template, @@ -37,15 +40,20 @@ async def generate_feedback( combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" media_service = GCPServiceClient() - media_asset = media_service.download_media(submission_data["submission_url"]) + media_asset = media_service.download_media( + submission_data["submission_url"] + ) response = await llm_provider.generate_with_audio( media_asset, combined_prompt, mime_type=media_asset["mime_type"], ) + raw_text = response.text self.cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_feedback = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_feedback = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) feedback = self._parse_feedback(raw_text, expected_format) feedback = self._attach_plagiarism_defaults(feedback) diff --git a/rag_service/feedback_utils/evaluation_generation.py b/rag_service/feedback_utils/evaluation_generation.py index 39bf1b1..cd54f75 100644 --- a/rag_service/feedback_utils/evaluation_generation.py +++ b/rag_service/feedback_utils/evaluation_generation.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional, Tuple import frappe + from ..core.llm_providers import create_llm_provider from ..utils.submission_data import ( MEDIA_SUBMISSION_TYPES, @@ -59,8 +60,13 @@ def __init__(self, feedback_service: Any): def detect_media_type(self, submission_data: Dict) -> str: """Detect media type using submission_type first, then URL extension as fallback.""" submission_type = (submission_data.get("submission_type") or "").lower() - if submission_type in MEDIA_SUBMISSION_TYPES or submission_type in TEXT_SUBMISSION_TYPES: - return "text" if submission_type in TEXT_SUBMISSION_TYPES else submission_type + if ( + submission_type in MEDIA_SUBMISSION_TYPES + or submission_type in TEXT_SUBMISSION_TYPES + ): + return ( + "text" if submission_type in TEXT_SUBMISSION_TYPES else submission_type + ) submission_url = submission_data.get("submission_url") or "" if not submission_url: @@ -124,7 +130,9 @@ def format_objectives(self, objectives: List[Dict]) -> str: formatted = [] for i, obj in enumerate(objectives, 1): if isinstance(obj, dict): - description = obj.get("description", obj.get("objective_id", "Unknown objective")) + description = obj.get( + "description", obj.get("objective_id", "Unknown objective") + ) else: description = str(obj) formatted.append(f"{i}. {description}") @@ -141,9 +149,7 @@ def format_rubrics(self, rubrics) -> str: for criterion, grades_list in rubrics.items(): prompt += f"\n{criterion}:\n" for grade_item in grades_list: - prompt += ( - f" Grade {grade_item['grade_value']}: {grade_item['grade_description']}\n" - ) + prompt += f" Grade {grade_item['grade_value']}: {grade_item['grade_description']}\n" return prompt @@ -188,7 +194,13 @@ def format_rubrics(self, rubrics) -> str: # ] # } - def get_prompt_template(self, media_type: str, prompt_type: str, activity_type: str, course_vertical: str): + def get_prompt_template( + self, + media_type: str, + prompt_type: str, + activity_type: str, + course_vertical: str, + ): """Get active template for the given media type.""" try: print(f"\n=== Getting Prompt Template for {prompt_type}===") @@ -198,18 +210,23 @@ def get_prompt_template(self, media_type: str, prompt_type: str, activity_type: print(f"prompt_type Type: {prompt_type}") print(f"activity_type: {activity_type}") - - templates = frappe.get_list( "Prompt Template", - filters={"is_active": 1, "media_type": media_type, "prompt_type": prompt_type, - "activity_type": activity_type, "course_vertical": course_vertical }, + filters={ + "is_active": 1, + "media_type": media_type, + "prompt_type": prompt_type, + "activity_type": activity_type, + "course_vertical": course_vertical, + }, order_by="version desc", limit=1, ) template = frappe.get_doc("Prompt Template", templates[0].name) - print(f"Using {media_type} {prompt_type} template: {template.template_name}") + print( + f"Using {media_type} {prompt_type} template: {template.template_name}" + ) template.db_set("last_used", datetime.now()) self._touch_prompt_segment(template.system_segment) @@ -225,8 +242,9 @@ def get_prompt_template(self, media_type: str, prompt_type: str, activity_type: frappe.log_error("Template Error", error_msg) raise Exception("No active template found") - - def _get_expected_format(self, template: Any, prompt_type: str = "feedback") -> Dict: + def _get_expected_format( + self, template: Any, prompt_type: str = "feedback" + ) -> Dict: try: if hasattr(template, "response_format") and template.response_format: return json.loads(template.response_format) @@ -234,7 +252,6 @@ def _get_expected_format(self, template: Any, prompt_type: str = "feedback") -> print(f"Invalid JSON in template response format: {e}") raise Exception("Active template has invalid response format") - def _render_prompt_content(self, content: str, prompt_vars: Dict[str, Any]) -> str: rendered = content or "" for key, value in prompt_vars.items(): @@ -287,14 +304,24 @@ def _build_prompt_vars( ) return { - "assignment_name": assignment_context.get("assignment", {}).get("assignment_name", ""), - "assignment_description": assignment_context.get("assignment", {}).get("description", ""), - "course_vertical": assignment_context.get("assignment", {}).get("course_vertical", ""), - "assignment_type": assignment_context.get("assignment", {}).get("assignment_type", "Practical"), + "assignment_name": assignment_context.get("assignment", {}).get( + "assignment_name", "" + ), + "assignment_description": assignment_context.get("assignment", {}).get( + "description", "" + ), + "course_vertical": assignment_context.get("assignment", {}).get( + "course_vertical", "" + ), + "assignment_type": assignment_context.get("assignment", {}).get( + "assignment_type", "Practical" + ), "learning_objectives": learning_objectives, "rubric_evaluations": rubric_evaluations, "rubric_criteria": rubric_criteria, - "Language": assignment_context.get("student", {}).get("language", "English"), + "Language": assignment_context.get("student", {}).get( + "language", "English" + ), "Grade_Level": assignment_context.get("student", {}).get("grade", "1"), "submission_type": submission_data.get("submission_type", ""), "submission_text": submission_data.get("submission_text", "") or "", @@ -303,7 +330,9 @@ def _build_prompt_vars( "expected_submission_type": submission_data.get("expected_submission_type", ""), "archetype": submission_data.get("archetype"), "current_week": submission_data.get("current_week"), - "escalation_step_at_submit": submission_data.get("escalation_step_at_submit"), + "escalation_step_at_submit": submission_data.get( + "escalation_step_at_submit" + ), } def _format_prompts( @@ -328,7 +357,9 @@ def _format_prompts( subject_segment = frappe.get_doc("Prompt Segment", template.subject_segment) output_segment = frappe.get_doc("Prompt Segment", template.output_segment) - system_prompt = self._render_prompt_content(system_segment.content, prompt_vars) + system_prompt = self._render_prompt_content( + system_segment.content, prompt_vars + ) user_prompt_sections = [ self._render_prompt_content(grading_segment.content, prompt_vars), self._render_prompt_content(subject_segment.content, prompt_vars), @@ -338,11 +369,13 @@ def _format_prompts( if submission_data.get("submission_type") in TEXT_SUBMISSION_TYPES: user_prompt_sections.append(prompt_vars["submission_text_context"]) - formatted_user_prompt = "\n\n".join(section for section in user_prompt_sections if section) - print("#"*80) + formatted_user_prompt = "\n\n".join( + section for section in user_prompt_sections if section + ) + print("#" * 80) print(system_prompt) print(formatted_user_prompt) - print("#"*80) + print("#" * 80) return system_prompt, formatted_user_prompt except Exception as e: print(f"Error formatting prompt: {e}") @@ -356,7 +389,7 @@ def _parse_rubric_evaluations(self, raw_text: str) -> Dict: except Exception as e: print(f"Error parsing rubric evaluations: {e}") raise Exception("Failed to parse rubric evaluations from LLM response") - + def _parse_grade_value_feedback(self, grade_value) -> int: feedback = { "overall_feedback": "Good job", @@ -366,9 +399,9 @@ def _parse_grade_value_feedback(self, grade_value) -> int: { "Skill": "Content Knowledge", "grade_value": grade_value, - "observation": "Neutral" + "observation": "Neutral", } - ] + ], } return feedback @@ -376,7 +409,9 @@ def _parse_feedback(self, raw_text: str, expected_format: Dict) -> Dict: cleaned_text = self.clean_json_response(raw_text or "") try: feedback = json.loads(cleaned_text) - feedback = self.feedback_service.validate_feedback_structure(feedback, expected_format) + feedback = self.feedback_service.validate_feedback_structure( + feedback, expected_format + ) except json.JSONDecodeError: feedback = self.feedback_service.create_error_feedback(raw_text) @@ -394,11 +429,13 @@ def _attach_plagiarism_defaults(self, feedback: Dict) -> Dict: "similar_sources": [], } return feedback - - def _attach_evaluation_to_feedback(self, feedback: Dict, evaluation_result: Dict) -> Dict: + + def _attach_evaluation_to_feedback( + self, feedback: Dict, evaluation_result: Dict + ) -> Dict: feedback["rubric_evaluations"] = evaluation_result.get("rubric_evaluations", []) return feedback - + def _attach_default_fileds(self, feedback: Dict) -> Dict: feedback.setdefault("strengths", []) feedback.setdefault("areas_for_improvement", []) @@ -416,7 +453,13 @@ def _template_used_name(self, template: Any) -> str: def _touch_prompt_segment(self, segment_name: str) -> None: if not segment_name: return - frappe.db.set_value("Prompt Segment", segment_name, "last_used", datetime.now(), update_modified=False) + frappe.db.set_value( + "Prompt Segment", + segment_name, + "last_used", + datetime.now(), + update_modified=False, + ) def _create_llm_provider( self, @@ -439,12 +482,13 @@ def _create_llm_provider( settings = frappe.get_doc("LLM Settings", llm_settings[0].name) model_used = llm_settings[0].name + provider_name = settings.provider llm_provider = create_llm_provider( - provider=llm_provider_name, + provider=provider_name, api_key="", model_name=settings.model_name, - temperature=settings.temperature or 0, + temperature=settings.temperature or 0.1, max_tokens=settings.max_tokens or 2000, settings=settings, ) @@ -457,23 +501,23 @@ async def generate_ai_feedback( if media_type == "video": from .video_evaluation import VideoEvaluationGenerator - return await VideoEvaluationGenerator(self.feedback_service).generate_feedback( - assignment_context, submission_data, submission_id - ) + return await VideoEvaluationGenerator( + self.feedback_service + ).generate_feedback(assignment_context, submission_data, submission_id) if media_type == "audio": from .audio_evaluation import AudioEvaluationGenerator - return await AudioEvaluationGenerator(self.feedback_service).generate_feedback( - assignment_context, submission_data, submission_id - ) + return await AudioEvaluationGenerator( + self.feedback_service + ).generate_feedback(assignment_context, submission_data, submission_id) if media_type == "text": from .text_evaluation import TextEvaluationGenerator - return await TextEvaluationGenerator(self.feedback_service).generate_feedback( - assignment_context, submission_data, submission_id - ) + return await TextEvaluationGenerator( + self.feedback_service + ).generate_feedback(assignment_context, submission_data, submission_id) from .image_evaluation_both import ImageEvaluationGenerator @@ -481,7 +525,6 @@ async def generate_ai_feedback( assignment_context, submission_data, submission_id ) - # async def generate_ai_feedback( # self, assignment_context: Dict, submission_url: str, submission_id: str # ) -> Tuple[Dict, str, str]: diff --git a/rag_service/feedback_utils/image_evaluation.py b/rag_service/feedback_utils/image_evaluation.py index 7d3d675..4ccd9d8 100644 --- a/rag_service/feedback_utils/image_evaluation.py +++ b/rag_service/feedback_utils/image_evaluation.py @@ -1,36 +1,20 @@ # rag_service/rag_service/feedback_utils/image_evaluation.py import json +import os from typing import Any, Dict, Optional, Tuple import frappe -from .evaluation_generation import EvaluationGenerator from ..core.llm_providers import create_llm_provider +from .evaluation_generation import EvaluationGenerator class ImageEvaluationGenerator(EvaluationGenerator): """Generate AI feedback for image submissions.""" - def _create_llm_provider(self,llm_provider_name) -> Tuple[Any, str]: - llm_settings = frappe.get_list("LLM Settings", - filters={"is_active": 1, "provider": llm_provider_name}, limit=1) - if not llm_settings: - raise Exception(f"No active {llm_provider_name} configuration found") - - settings = frappe.get_doc("LLM Settings", llm_settings[0].name) - model_used = llm_settings[0].name - - llm_provider = create_llm_provider( - provider=llm_provider_name, - api_key="", - model_name=settings.model_name, - temperature=settings.temperature or 0, - max_tokens=settings.max_tokens or 2000, - settings=settings, - ) - - return llm_provider, model_used + def _create_llm_provider(self) -> Tuple[Any, str]: + return super()._create_llm_provider() async def generate_feedback( self, assignment_context: Dict, submission_url: str, submission_id: str @@ -45,12 +29,15 @@ async def generate_feedback( activity_type = assignment_context["assignment"].get("activity_type") course_vertical = assignment_context["assignment"].get("course_vertical") - llm_provider_name = "Gemini" - llm_provider, model_used = self._create_llm_provider(llm_provider_name) + llm_provider, model_used = self._create_llm_provider() - evaluation_result = await self.generate_evaluation(llm_provider, submission_url, assignment_context) + evaluation_result = await self.generate_evaluation( + llm_provider, submission_url, assignment_context + ) - template = self.get_prompt_template("image", "feedback", activity_type, course_vertical) + template = self.get_prompt_template( + "image", "feedback", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( @@ -61,11 +48,16 @@ async def generate_feedback( ) combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" - response = await llm_provider.generate_with_vision(submission_url, combined_prompt) + response = await llm_provider.generate_with_vision( + submission_url, combined_prompt + ) + raw_text = response.text cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_feedback = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_feedback = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) self.cost = self.cost + cost print(f"\nRaw LLM Output:\n{raw_text}") @@ -73,7 +65,11 @@ async def generate_feedback( feedback = self._attach_plagiarism_defaults(feedback) feedback = self._attach_evaluation_to_feedback(feedback, evaluation_result) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{self.log_prob_eval}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{self.log_prob_eval}", + ] template_used = self._template_used_name(template) @@ -88,10 +84,16 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:1", f"Feedback_LP:0.89", f"Eval_LP:0.78"] + error_feedback["strengths"] = [ + "cost:1", + f"Feedback_LP:0.89", + f"Eval_LP:0.78", + ] return error_feedback, "N/A", template_used - async def generate_evaluation(self, llm_provider: Any, submission_url: str, assignment_context: Dict) -> Dict: + async def generate_evaluation( + self, llm_provider: Any, submission_url: str, assignment_context: Dict + ) -> Dict: try: print("\n=== Starting Rubric Evaluation Generation (Image) ===") submission_data = { @@ -102,7 +104,9 @@ async def generate_evaluation(self, llm_provider: Any, submission_url: str, assi activity_type = assignment_context["assignment"].get("activity_type") course_vertical = assignment_context["assignment"].get("course_vertical") - template = self.get_prompt_template("image", "evaluation", activity_type, course_vertical) + template = self.get_prompt_template( + "image", "evaluation", activity_type, course_vertical + ) system_prompt, formatted_user_prompt = self._format_prompts( template, assignment_context, @@ -111,10 +115,14 @@ async def generate_evaluation(self, llm_provider: Any, submission_url: str, assi ) combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" - response = await llm_provider.generate_with_vision(submission_url, combined_prompt) + response = await llm_provider.generate_with_vision( + submission_url, combined_prompt + ) raw_text = response.text cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_eval = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_eval = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) self.cost = cost evaluation_result = self._parse_rubric_evaluations(raw_text) diff --git a/rag_service/feedback_utils/image_evaluation_both.py b/rag_service/feedback_utils/image_evaluation_both.py index 7d4bb56..758a4e4 100644 --- a/rag_service/feedback_utils/image_evaluation_both.py +++ b/rag_service/feedback_utils/image_evaluation_both.py @@ -1,16 +1,16 @@ # rag_service/rag_service/feedback_utils/image_evaluation_both.py +import os from typing import Dict, Tuple import frappe -from .evaluation_generation import EvaluationGenerator from ..utils.gcp_service_client import GCPServiceClient +from .evaluation_generation import EvaluationGenerator class ImageEvaluationGenerator(EvaluationGenerator): """Generate AI feedback for image submissions.""" - async def generate_feedback( self, assignment_context: Dict, submission_data: Dict, submission_id: str @@ -28,8 +28,9 @@ async def generate_feedback( activity_type = assignment_context["assignment"].get("activity_type") course_vertical = assignment_context["assignment"].get("course_vertical") - - template = self.get_prompt_template("image", "both", activity_type, course_vertical) + template = self.get_prompt_template( + "image", "both", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( @@ -41,23 +42,32 @@ async def generate_feedback( combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" media_service = GCPServiceClient() - media_asset = media_service.download_media(submission_data["submission_url"]) + media_asset = media_service.download_media( + submission_data["submission_url"] + ) response = await llm_provider.generate_with_vision( media_asset, combined_prompt, mime_type=media_asset["mime_type"], ) + raw_text = response.text self.cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_feedback = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_feedback = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) print(f"\nRaw LLM Output:\n{raw_text}") feedback = self._parse_feedback(raw_text, expected_format) feedback = self._attach_plagiarism_defaults(feedback) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{0.0}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{0.0}", + ] template_used = self._template_used_name(template) @@ -72,7 +82,11 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:1", f"Feedback_LP:0.89", f"Eval_LP:0.78"] + error_feedback["strengths"] = [ + "cost:1", + f"Feedback_LP:0.89", + f"Eval_LP:0.78", + ] return error_feedback, "N/A", template_used finally: if media_service: diff --git a/rag_service/feedback_utils/image_evaluation_grade_response.py b/rag_service/feedback_utils/image_evaluation_grade_response.py index 18a6882..e105892 100644 --- a/rag_service/feedback_utils/image_evaluation_grade_response.py +++ b/rag_service/feedback_utils/image_evaluation_grade_response.py @@ -1,21 +1,25 @@ # rag_service/rag_service/feedback_utils/image_evaluation.py import json +import os from typing import Any, Dict, Optional, Tuple import frappe -from .evaluation_generation import EvaluationGenerator from ..core.llm_providers import create_llm_provider +from .evaluation_generation import EvaluationGenerator class ImageEvaluationGenerator(EvaluationGenerator): """Generate AI feedback for image submissions.""" - def _create_llm_provider(self,llm_provider_name) -> Tuple[Any, str]: - llm_settings = frappe.get_list("LLM Settings", - filters={"is_active": 1, "provider": llm_provider_name}, limit=1) - + def _create_llm_provider(self, llm_provider_name) -> Tuple[Any, str]: + llm_settings = frappe.get_list( + "LLM Settings", + filters={"is_active": 1, "provider": llm_provider_name}, + limit=1, + ) + if not llm_settings: raise Exception(f"No active {llm_provider_name} configuration found") @@ -32,7 +36,6 @@ def _create_llm_provider(self,llm_provider_name) -> Tuple[Any, str]: ) return llm_provider, model_used - async def generate_feedback( self, assignment_context: Dict, submission_url: str, submission_id: str @@ -50,8 +53,10 @@ async def generate_feedback( llm_provider_name = "Gemini" llm_provider, model_used = self._create_llm_provider(llm_provider_name) - template = self.get_prompt_template("image", "evaluation", activity_type, course_vertical) - + template = self.get_prompt_template( + "image", "evaluation", activity_type, course_vertical + ) + print(assignment_context) system_prompt, formatted_user_prompt = self._format_prompts( template, @@ -62,17 +67,26 @@ async def generate_feedback( combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" print(f"\nCombined Prompt Sent to LLM:\n{combined_prompt}") - response = await llm_provider.generate_with_vision(submission_url, combined_prompt) + response = await llm_provider.generate_with_vision( + submission_url, combined_prompt + ) + raw_text = response.text self.cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_feedback = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_feedback = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) print(f"\nRaw LLM Output:\n{raw_text}") feedback = self._parse_grade_value_feedback(raw_text) feedback = self._attach_plagiarism_defaults(feedback) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{0.0}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{0.0}", + ] template_used = self._template_used_name(template) @@ -87,5 +101,9 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:0.0", f"Feedback_LP:0.89", f"Eval_LP:0.78"] + error_feedback["strengths"] = [ + "cost:0.0", + f"Feedback_LP:0.89", + f"Eval_LP:0.78", + ] return error_feedback, "N/A", template_used diff --git a/rag_service/feedback_utils/image_evaluation_no_rubric.py b/rag_service/feedback_utils/image_evaluation_no_rubric.py index fbd74f9..fd2d00f 100644 --- a/rag_service/feedback_utils/image_evaluation_no_rubric.py +++ b/rag_service/feedback_utils/image_evaluation_no_rubric.py @@ -1,21 +1,25 @@ # rag_service/rag_service/feedback_utils/image_evaluation.py import json +import os from typing import Any, Dict, Optional, Tuple import frappe -from .evaluation_generation import EvaluationGenerator from ..core.llm_providers import create_llm_provider +from .evaluation_generation import EvaluationGenerator class ImageEvaluationGenerator(EvaluationGenerator): """Generate AI feedback for image submissions.""" - def _create_llm_provider(self,llm_provider_name) -> Tuple[Any, str]: - llm_settings = frappe.get_list("LLM Settings", - filters={"is_active": 1, "provider": llm_provider_name}, limit=1) - + def _create_llm_provider(self, llm_provider_name) -> Tuple[Any, str]: + llm_settings = frappe.get_list( + "LLM Settings", + filters={"is_active": 1, "provider": llm_provider_name}, + limit=1, + ) + if not llm_settings: raise Exception(f"No active {llm_provider_name} configuration found") @@ -32,7 +36,6 @@ def _create_llm_provider(self,llm_provider_name) -> Tuple[Any, str]: ) return llm_provider, model_used - async def generate_feedback( self, assignment_context: Dict, submission_url: str, submission_id: str @@ -50,54 +53,56 @@ async def generate_feedback( llm_provider_name = "Gemini" llm_provider, model_used = self._create_llm_provider(llm_provider_name) - template = self.get_prompt_template("image", "both", activity_type, course_vertical) + template = self.get_prompt_template( + "image", "both", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) assignment_context["assignment"]["rubrics"] = { - "Content Knowledge": [ - { - "grade_value": 1, - "grade_description": "Invalid or random submission — task is blank, off-topic, or unrelated. No link to concept seen." - }, - { - "grade_value": 2, - "grade_description": "Some link to topic visible but full of mistakes or confusion. The student likely didn't understand all steps." - }, - { - "grade_value": 3, - "grade_description": "Main idea is correct and partly applied. Student is trying to use the concept but not yet fully correct." - }, - { - "grade_value": 4, - "grade_description": "Work is accurate, clear, and independently done. Student can apply the concept correctly as shown." - }, - { - "grade_value": 5, - "grade_description": "Work extends the concept meaningfully — connects it to real-life or shows creative application." - } - ], - "Creativity": [ - { - "grade_value": 1, - "grade_description": "Invalid or no submission. Task blank or repeated exactly as taught, showing no original input." - }, - { - "grade_value": 2, - "grade_description": "Minor variation without reason. Adds one small change (color, word, step, example) but without any creative link." - }, - { - "grade_value": 3, - "grade_description": "Begins to combine ideas. Connects two or more taught concepts or introduces a small improvement that shows personal thought." - }, - { - "grade_value": 4, - "grade_description": "Applies imagination with purpose. Adjusts or redesigns task elements to make it clearer, more effective, or more interesting." - }, - { - "grade_value": 5, - "grade_description": "Generates and improves ideas. Produces original and relevant solutions by combining, evaluating, or refining ideas; connects learning to real-world or cross-topic contexts." - } - ] -} + "Content Knowledge": [ + { + "grade_value": 1, + "grade_description": "Invalid or random submission — task is blank, off-topic, or unrelated. No link to concept seen.", + }, + { + "grade_value": 2, + "grade_description": "Some link to topic visible but full of mistakes or confusion. The student likely didn't understand all steps.", + }, + { + "grade_value": 3, + "grade_description": "Main idea is correct and partly applied. Student is trying to use the concept but not yet fully correct.", + }, + { + "grade_value": 4, + "grade_description": "Work is accurate, clear, and independently done. Student can apply the concept correctly as shown.", + }, + { + "grade_value": 5, + "grade_description": "Work extends the concept meaningfully — connects it to real-life or shows creative application.", + }, + ], + "Creativity": [ + { + "grade_value": 1, + "grade_description": "Invalid or no submission. Task blank or repeated exactly as taught, showing no original input.", + }, + { + "grade_value": 2, + "grade_description": "Minor variation without reason. Adds one small change (color, word, step, example) but without any creative link.", + }, + { + "grade_value": 3, + "grade_description": "Begins to combine ideas. Connects two or more taught concepts or introduces a small improvement that shows personal thought.", + }, + { + "grade_value": 4, + "grade_description": "Applies imagination with purpose. Adjusts or redesigns task elements to make it clearer, more effective, or more interesting.", + }, + { + "grade_value": 5, + "grade_description": "Generates and improves ideas. Produces original and relevant solutions by combining, evaluating, or refining ideas; connects learning to real-world or cross-topic contexts.", + }, + ], + } print(assignment_context) system_prompt, formatted_user_prompt = self._format_prompts( template, @@ -108,17 +113,26 @@ async def generate_feedback( combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" print(f"\nCombined Prompt Sent to LLM:\n{combined_prompt}") - response = await llm_provider.generate_with_vision(submission_url, combined_prompt) + response = await llm_provider.generate_with_vision( + submission_url, combined_prompt + ) + raw_text = response.text self.cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_feedback = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_feedback = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) print(f"\nRaw LLM Output:\n{raw_text}") feedback = self._parse_feedback(raw_text, expected_format) feedback = self._attach_plagiarism_defaults(feedback) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{0.0}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{0.0}", + ] template_used = self._template_used_name(template) @@ -133,5 +147,9 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:1", f"Feedback_LP:0.89", f"Eval_LP:0.78"] + error_feedback["strengths"] = [ + "cost:1", + f"Feedback_LP:0.89", + f"Eval_LP:0.78", + ] return error_feedback, "N/A", template_used diff --git a/rag_service/feedback_utils/image_evaluation_only.py b/rag_service/feedback_utils/image_evaluation_only.py index 469470b..69fb963 100644 --- a/rag_service/feedback_utils/image_evaluation_only.py +++ b/rag_service/feedback_utils/image_evaluation_only.py @@ -5,8 +5,8 @@ import frappe -from .evaluation_generation import EvaluationGenerator from ..core.llm_providers import create_llm_provider +from .evaluation_generation import EvaluationGenerator class ImageEvaluationGenerator(EvaluationGenerator): @@ -28,29 +28,7 @@ def _resolve_service_account_credentials(self, settings: Any) -> Optional[Dict]: return None def _create_llm_provider(self) -> Tuple[Any, str]: - gemini_settings = frappe.get_list("Gemini Settings", filters={"is_active": 1}, limit=1) - if not gemini_settings: - raise Exception("No active Gemini configuration found") - - settings = frappe.get_doc("Gemini Settings", gemini_settings[0].name) - model_used = gemini_settings[0].name - - key_data = self._resolve_service_account_credentials(settings) - if not key_data: - raise Exception("Gemini service account key JSON is required") - - llm_provider = create_llm_provider( - provider="Gemini", - api_key="", - model_name=settings.model_name, - temperature=settings.temperature or 0, - max_tokens=settings.max_tokens or 2000, - key_data=key_data, - location=settings.location or "us-central1", - project_id=settings.project_id or None, - ) - - return llm_provider, model_used + return super()._create_llm_provider() async def generate_feedback( self, assignment_context: Dict, submission_url: str, submission_id: str @@ -67,9 +45,13 @@ async def generate_feedback( llm_provider, model_used = self._create_llm_provider() - evaluation_result = await self.generate_evaluation(llm_provider, submission_url, assignment_context) + evaluation_result = await self.generate_evaluation( + llm_provider, submission_url, assignment_context + ) - template = self.get_prompt_template("image", "feedback", activity_type, course_vertical) + template = self.get_prompt_template( + "image", "feedback", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( @@ -94,17 +76,23 @@ async def generate_feedback( # feedback = self._parse_feedback(raw_text, expected_format) feedback = { - "overall_feedback": "Great job! Keep up the creative work!", - "overall_feedback_translated": "बहुत अच्छा काम किया, इसे जारी रखें!", - "final_grade": 40 - } - self.log_prob_feedback = -1.23 # Example log probability for feedback generation + "overall_feedback": "Great job! Keep up the creative work!", + "overall_feedback_translated": "बहुत अच्छा काम किया, इसे जारी रखें!", + "final_grade": 40, + } + self.log_prob_feedback = ( + -1.23 + ) # Example log probability for feedback generation print(f"\nParsed Feedback:\n{feedback}") feedback = self._attach_plagiarism_defaults(feedback) print(f"\nModel Used: {model_used}") feedback = self._attach_evaluation_to_feedback(feedback, evaluation_result) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{self.log_prob_eval}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{self.log_prob_eval}", + ] print(f"\nParsed Feedback:\n{feedback}") template_used = self._template_used_name(template) @@ -120,10 +108,16 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:1", f"Feedback_LP:0.89", f"Eval_LP:0.78"] + error_feedback["strengths"] = [ + "cost:1", + f"Feedback_LP:0.89", + f"Eval_LP:0.78", + ] return error_feedback, "N/A", template_used - async def generate_evaluation(self, llm_provider: Any, submission_url: str, assignment_context: Dict) -> Dict: + async def generate_evaluation( + self, llm_provider: Any, submission_url: str, assignment_context: Dict + ) -> Dict: try: print("\n=== Starting Rubric Evaluation Generation (Image) ===") submission_data = { @@ -134,7 +128,9 @@ async def generate_evaluation(self, llm_provider: Any, submission_url: str, assi activity_type = assignment_context["assignment"].get("activity_type") course_vertical = assignment_context["assignment"].get("course_vertical") - template = self.get_prompt_template("image", "evaluation", activity_type, course_vertical) + template = self.get_prompt_template( + "image", "evaluation", activity_type, course_vertical + ) system_prompt, formatted_user_prompt = self._format_prompts( template, assignment_context, @@ -143,10 +139,14 @@ async def generate_evaluation(self, llm_provider: Any, submission_url: str, assi ) combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" - response = await llm_provider.generate_with_vision(submission_url, combined_prompt) + response = await llm_provider.generate_with_vision( + submission_url, combined_prompt + ) raw_text = response.text cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_eval = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_eval = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) self.cost = cost evaluation_result = self._parse_rubric_evaluations(raw_text) diff --git a/rag_service/feedback_utils/image_evaluation_together_ai.py b/rag_service/feedback_utils/image_evaluation_together_ai.py index 937a7e3..63a170d 100644 --- a/rag_service/feedback_utils/image_evaluation_together_ai.py +++ b/rag_service/feedback_utils/image_evaluation_together_ai.py @@ -5,17 +5,20 @@ import frappe -from .evaluation_generation import EvaluationGenerator from ..core.llm_providers import create_llm_provider +from .evaluation_generation import EvaluationGenerator class ImageEvaluationGenerator(EvaluationGenerator): """Generate AI feedback for image submissions.""" - def _create_llm_provider(self,llm_provider_name, model_name) -> Tuple[Any, str]: - llm_settings = frappe.get_list("LLM Settings", - filters={"is_active": 1, "provider": llm_provider_name}, limit=1) - + def _create_llm_provider(self, llm_provider_name, model_name) -> Tuple[Any, str]: + llm_settings = frappe.get_list( + "LLM Settings", + filters={"is_active": 1, "provider": llm_provider_name}, + limit=1, + ) + if not llm_settings: raise Exception(f"No active {llm_provider_name} configuration found") @@ -27,11 +30,10 @@ def _create_llm_provider(self,llm_provider_name, model_name) -> Tuple[Any, str]: api_key=settings.api_key, model_name=model_name, temperature=settings.temperature or 0, - max_tokens=settings.max_tokens or 2000 + max_tokens=settings.max_tokens or 2000, ) return llm_provider, model_used - async def generate_feedback( self, assignment_context: Dict, submission_url: str, submission_id: str @@ -51,10 +53,14 @@ async def generate_feedback( # model_name = "google/gemma-3n-E4B-it" model_name = "Qwen/Qwen3.5-9B" # model_name = "ServiceNow-AI/Apriel-1.6-15b-Thinker" - - llm_provider, model_used = self._create_llm_provider(llm_provider_name, model_name) - template = self.get_prompt_template("image", "both", activity_type, course_vertical) + llm_provider, model_used = self._create_llm_provider( + llm_provider_name, model_name + ) + + template = self.get_prompt_template( + "image", "both", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( @@ -63,18 +69,23 @@ async def generate_feedback( submission_data, [], ) - response = await llm_provider.generate_with_vision(submission_url, system_prompt, formatted_user_prompt) + response = await llm_provider.generate_with_vision( + submission_url, system_prompt, formatted_user_prompt + ) print(f"\nRaw LLM Output:\n{response}") raw_text = response.choices[0].message.content print(f"\nLLM Output:\n{raw_text}") self.cost = llm_provider.calculate_cost(response.to_dict()) self.log_prob_feedback = model_name - feedback = self._parse_feedback(raw_text, expected_format) feedback = self._attach_plagiarism_defaults(feedback) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{0.0}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{0.0}", + ] template_used = self._template_used_name(template) @@ -89,5 +100,5 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:0", f"Feedback_LP:0.0", f"Eval_LP:0.0"] + error_feedback["strengths"] = ["cost:0", f"Feedback_LP:0.0", f"Eval_LP:0.0"] return error_feedback, "N/A", template_used diff --git a/rag_service/feedback_utils/text_evaluation.py b/rag_service/feedback_utils/text_evaluation.py index c34bfc9..4a77f31 100644 --- a/rag_service/feedback_utils/text_evaluation.py +++ b/rag_service/feedback_utils/text_evaluation.py @@ -1,9 +1,9 @@ # rag_service/rag_service/feedback_utils/text_evaluation.py +import traceback from typing import Dict, Tuple import frappe -import traceback from .evaluation_generation import EvaluationGenerator @@ -25,7 +25,9 @@ async def generate_feedback( course_vertical = assignment_context["assignment"].get("course_vertical") course_vertical = "Arts" - template = self.get_prompt_template("text", "both", activity_type, course_vertical) + template = self.get_prompt_template( + "text", "both", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( template, @@ -33,7 +35,6 @@ async def generate_feedback( submission_data, [], ) - response, cost, _ = await llm_provider.generate( [ diff --git a/rag_service/feedback_utils/video_evaluation.py b/rag_service/feedback_utils/video_evaluation.py index 32cda93..c8b2cbb 100644 --- a/rag_service/feedback_utils/video_evaluation.py +++ b/rag_service/feedback_utils/video_evaluation.py @@ -1,11 +1,12 @@ # rag_service/rag_service/feedback_utils/video_evaluation.py +import os from typing import Dict, Tuple import frappe -from .evaluation_generation import EvaluationGenerator from ..utils.gcp_service_client import GCPServiceClient +from .evaluation_generation import EvaluationGenerator class VideoEvaluationGenerator(EvaluationGenerator): @@ -17,7 +18,7 @@ async def generate_feedback( media_service = None media_asset = None try: - print("\n=== Starting AI Feedback Generation (Image) ===") + print("\n=== Starting AI Feedback Generation (Video) ===") llm_provider, model_used = self._create_llm_provider( "Gemini", @@ -28,8 +29,9 @@ async def generate_feedback( course_vertical = assignment_context["assignment"].get("course_vertical") print(f"Activity Type: {activity_type}, Course Vertical: {course_vertical}") - - template = self.get_prompt_template("video", "both", activity_type, course_vertical) + template = self.get_prompt_template( + "video", "both", activity_type, course_vertical + ) expected_format = self._get_expected_format(template) system_prompt, formatted_user_prompt = self._format_prompts( @@ -41,23 +43,32 @@ async def generate_feedback( combined_prompt = f"{system_prompt}\n\n{formatted_user_prompt}" media_service = GCPServiceClient() - media_asset = media_service.download_media(submission_data["submission_url"]) + media_asset = media_service.download_media( + submission_data["submission_url"] + ) response = await llm_provider.generate_with_video( media_asset, combined_prompt, mime_type=media_asset["mime_type"], ) + raw_text = response.text self.cost = llm_provider.calculate_cost(response.to_dict()) - self.log_prob_feedback = response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + self.log_prob_feedback = ( + response.to_dict().get("candidates", [{}])[0].get("avg_logprobs", None) + ) print(f"\nRaw LLM Output:\n{raw_text}") feedback = self._parse_feedback(raw_text, expected_format) feedback = self._attach_plagiarism_defaults(feedback) feedback = self._attach_default_fileds(feedback) - feedback['strengths'] = [f"cost:{self.cost}", f"Feedback_LP:{self.log_prob_feedback}", f"Eval_LP:{0.0}"] + feedback["strengths"] = [ + f"cost:{self.cost}", + f"Feedback_LP:{self.log_prob_feedback}", + f"Eval_LP:{0.0}", + ] template_used = self._template_used_name(template) @@ -72,7 +83,11 @@ async def generate_feedback( template_used = "Built-in Universal Template for Error" error_feedback = self.feedback_service.create_error_feedback(str(e)) error_feedback = self._attach_plagiarism_defaults(error_feedback) - error_feedback['strengths'] = ["cost:1", f"Feedback_LP:0.89", f"Eval_LP:0.78"] + error_feedback["strengths"] = [ + "cost:1", + f"Feedback_LP:0.89", + f"Eval_LP:0.78", + ] return error_feedback, "N/A", template_used finally: if media_service: diff --git a/rag_service/hooks.py b/rag_service/hooks.py index 13bd183..ddfd2a9 100644 --- a/rag_service/hooks.py +++ b/rag_service/hooks.py @@ -5,6 +5,11 @@ app_email = "mail@evalix.xyz" app_license = "mit" +# ── SRE Monitoring ──────────────────────────────────────────────────────────── +before_request = ["rag_service.middleware.before_request"] +after_request = ["rag_service.middleware.after_request"] +on_exception = ["rag_service.middleware.on_exception"] + commands = [ "rag_service.rag_service.commands.consumer.commands" @@ -201,8 +206,7 @@ # Request Events # ---------------- -# before_request = ["rag_service.utils.before_request"] -# after_request = ["rag_service.utils.after_request"] +# before_request and after_request are registered above via SRE Monitoring block # Job Events # ---------- diff --git a/rag_service/middleware.py b/rag_service/middleware.py new file mode 100644 index 0000000..0117813 --- /dev/null +++ b/rag_service/middleware.py @@ -0,0 +1,43 @@ +# rag_service/middleware.py — identical pattern to tap_lms + +import time +import traceback +import frappe +from .monitoring import record_request, emit_structured_log + + +def before_request(): + try: + frappe.local._sre_t0 = time.monotonic() + except Exception: + pass + + +def after_request(): + try: + t0 = getattr(frappe.local, "_sre_t0", None) + duration_ms = (time.monotonic() - t0) * 1000 if t0 is not None else None + req = getattr(frappe.local, "request", None) + response = getattr(frappe.local, "response", None) + status_code = response.get("http_status_code", 200) if isinstance(response, dict) else 200 + record_request( + path=req.path if req else "unknown", + method=req.method if req else "unknown", + status_code=int(status_code), + duration_ms=duration_ms, + ) + except Exception: + pass + + +def on_exception(): + try: + req = getattr(frappe.local, "request", None) + emit_structured_log( + severity="ERROR", + message="unhandled_exception", + path=req.path if req else "unknown", + traceback=traceback.format_exc(), + ) + except Exception: + pass diff --git a/rag_service/monitoring.py b/rag_service/monitoring.py new file mode 100644 index 0000000..9c8ae1f --- /dev/null +++ b/rag_service/monitoring.py @@ -0,0 +1,126 @@ +# rag_service/monitoring.py +# +# Identical in structure to tap_lms/monitoring.py. +# Kept as a separate module so rag_service has no import dependency on tap_lms. + +import json +import frappe +from frappe.utils import now_datetime + + +def emit_structured_log(severity: str, message: str, **kwargs) -> None: + try: + import os + payload = { + "severity": severity, + "message": message, + "timestamp": str(now_datetime()), + "app": "rag_service", + "app_env": os.environ.get("APP_ENV", "unknown"), + } + payload.update({k: v for k, v in kwargs.items() if v is not None}) + print(json.dumps(payload, ensure_ascii=False), flush=True) + except Exception: + pass + + +def record_request(path, method, status_code, duration_ms, user=None): + try: + emit_structured_log( + severity="INFO" if status_code < 400 else "ERROR", + message="http_request", + http_path=path, + http_method=method, + http_status=status_code, + duration_ms=round(duration_ms, 2), + user=user, + ) + except Exception: + pass + + +def record_job(job_name, status, duration_ms=None, error=None, **extra): + try: + emit_structured_log( + severity="INFO" if status in ("success", "skip") else "ERROR", + message="background_job", + job_name=job_name, + job_status=status, + duration_ms=round(duration_ms, 2) if duration_ms is not None else None, + error=str(error) if error else None, + **extra, + ) + except Exception: + pass + + +# ── RAG-specific pipeline events ────────────────────────────────────────────── + +def record_rag_submission_received(submission_id, student_id=None, assignment_id=None): + try: + emit_structured_log( + severity="INFO", + message="rag_submission_received", + submission_id=submission_id, + student_id=student_id, + assignment_id=assignment_id, + ) + except Exception: + pass + + +def record_rag_feedback_complete(submission_id, model_used=None, template_used=None, duration_ms=None): + try: + emit_structured_log( + severity="INFO", + message="rag_feedback_complete", + submission_id=submission_id, + model_used=model_used, + template_used=template_used, + duration_ms=round(duration_ms, 2) if duration_ms is not None else None, + ) + except Exception: + pass + + +def record_rag_feedback_failed(submission_id, error, duration_ms=None): + try: + emit_structured_log( + severity="ERROR", + message="rag_feedback_failed", + submission_id=submission_id, + error=str(error), + duration_ms=round(duration_ms, 2) if duration_ms is not None else None, + ) + except Exception: + pass + + +def record_llm_call(submission_id, provider, model, status, duration_ms=None, error=None): + try: + emit_structured_log( + severity="INFO" if status == "success" else "ERROR", + message="llm_call_complete" if status == "success" else "llm_call_failed", + submission_id=submission_id, + llm_provider=provider, + llm_model=model, + duration_ms=round(duration_ms, 2) if duration_ms is not None else None, + error=str(error) if error else None, + ) + except Exception: + pass + + +def record_tap_lms_api_call(submission_id, endpoint, duration_ms, cache_hit=False, status_code=None): + try: + emit_structured_log( + severity="INFO" if not status_code or status_code < 400 else "ERROR", + message="tap_lms_api_call", + submission_id=submission_id, + endpoint=endpoint, + duration_ms=round(duration_ms, 2), + cache_hit=cache_hit, + http_status=status_code, + ) + except Exception: + pass diff --git a/rag_service/rag_service/doctype/llm_settings/llm_settings.json b/rag_service/rag_service/doctype/llm_settings/llm_settings.json index fa59620..4851652 100644 --- a/rag_service/rag_service/doctype/llm_settings/llm_settings.json +++ b/rag_service/rag_service/doctype/llm_settings/llm_settings.json @@ -1,117 +1,123 @@ { - "actions": [], - "allow_rename": 1, - "creation": "2024-11-01 13:41:11.924246", - "doctype": "DocType", - "engine": "InnoDB", - "field_order": [ - "provider", - "model_name", - "temperature", - "max_tokens", - "api_key", - "api_secret", - "location", - "project_id", - "credentials_json", - "is_active", - "is_default", - "description" - ], - "fields": [ - { - "fieldname": "provider", - "fieldtype": "Select", - "label": "LLM Provider", - "options": "OpenAI\nAnthropic\nTogether AI\nGemini\nCustom" - }, - { - "fieldname": "model_name", - "fieldtype": "Data", - "label": "Model Name" - }, - { - "fieldname": "api_key", - "fieldtype": "Data", - "label": "API Key", - "length": 200 - }, - { - "default": "0.7", - "fieldname": "temperature", - "fieldtype": "Float", - "label": "Temperature" - }, - { - "default": "1500", - "fieldname": "max_tokens", - "fieldtype": "Int", - "label": "Max Tokens" - }, + "actions": [], + "allow_rename": 1, + "creation": "2024-11-01 13:41:11.924246", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "provider", + "model_name", + "base_url", + "temperature", + "max_tokens", + "api_key", + "api_secret", + "location", + "project_id", + "credentials_json", + "is_active", + "is_default", + "description" + ], + "fields": [ { - "default": "us-central1", - "fieldname": "location", - "fieldtype": "Data", - "label": "Location" - }, - { - "fieldname": "project_id", - "fieldtype": "Data", - "label": "Project ID" - }, - { - "fieldname": "credentials_json", - "fieldtype": "Code", - "in_list_view": 1, - "label": "Service Account Credentials (JSON)", - "options": "JSON" - }, - { - "default": "0", - "fieldname": "is_active", - "fieldtype": "Check", - "label": "Is Active" - }, - { - "default": "0", - "fieldname": "is_default", - "fieldtype": "Check", - "label": "Is Default" - }, - { - "fieldname": "description", - "fieldtype": "Small Text", - "label": "Description" - }, - { - "fieldname": "api_secret", - "fieldtype": "Password", - "label": "API Secret", - "length": 250 - } - ], - "index_web_pages_for_search": 1, - "links": [], - "modified": "2025-05-13 18:29:10.580186", - "modified_by": "Administrator", - "module": "Rag Service", - "name": "LLM Settings", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "sort_field": "modified", - "sort_order": "DESC", - "states": [] -} \ No newline at end of file + "fieldname": "provider", + "fieldtype": "Select", + "label": "LLM Provider", + "options": "OpenAI\nAnthropic\nTogether AI\nGemini\nStub\nCustom" + }, + { + "fieldname": "model_name", + "fieldtype": "Data", + "label": "Model Name" + }, + { + "fieldname": "base_url", + "fieldtype": "Data", + "label": "Base URL" + }, + { + "fieldname": "api_key", + "fieldtype": "Data", + "label": "API Key", + "length": 200 + }, + { + "default": "0.7", + "fieldname": "temperature", + "fieldtype": "Float", + "label": "Temperature" + }, + { + "default": "1500", + "fieldname": "max_tokens", + "fieldtype": "Int", + "label": "Max Tokens" + }, + { + "default": "us-central1", + "fieldname": "location", + "fieldtype": "Data", + "label": "Location" + }, + { + "fieldname": "project_id", + "fieldtype": "Data", + "label": "Project ID" + }, + { + "fieldname": "credentials_json", + "fieldtype": "Code", + "in_list_view": 1, + "label": "Service Account Credentials (JSON)", + "options": "JSON" + }, + { + "default": "0", + "fieldname": "is_active", + "fieldtype": "Check", + "label": "Is Active" + }, + { + "default": "0", + "fieldname": "is_default", + "fieldtype": "Check", + "label": "Is Default" + }, + { + "fieldname": "description", + "fieldtype": "Small Text", + "label": "Description" + }, + { + "fieldname": "api_secret", + "fieldtype": "Password", + "label": "API Secret", + "length": 250 + } + ], + "index_web_pages_for_search": 1, + "links": [], + "modified": "2025-05-13 18:29:10.580186", + "modified_by": "Administrator", + "module": "Rag Service", + "name": "LLM Settings", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} diff --git a/rag_service/scripts/console_consumer.py b/rag_service/scripts/console_consumer.py index 22509d2..5098231 100644 --- a/rag_service/scripts/console_consumer.py +++ b/rag_service/scripts/console_consumer.py @@ -1,5 +1,65 @@ +#!/usr/bin/env python +# ── STEP 1: INJECT RUNTIME PATHS BEFORE ANY OTHER CODES ────────────────────── +import sys +import os + +py_ver = f"python{sys.version_info.major}.{sys.version_info.minor}" + +isolated_paths = [ + "/workspace/rag_service", + f"/home/frappe/rag_venv/lib/{py_ver}/site-packages" +] + +for path in isolated_paths: + if path not in sys.path: + sys.path.insert(0, path) + +# ── STEP 2: STABLE STANDALONE FRAPPE BOOTSTRAP ────────────────────────────── +import frappe +import importlib + +# 1. Ensure global log folder is ready +os.makedirs("/home/frappe/logs", exist_ok=True) + +# 2. Map system site configurations +bench_sites_path = "/home/frappe/frappe-bench/sites" +site_name = os.getenv("SITE_NAME", "tap_lms.localhost") + +frappe.init(site=site_name, sites_path=bench_sites_path) + +# 3. Create the site-specific log folder if it doesn't exist +try: + sitelog_dir = os.path.dirname(frappe.utils.logger.get_log_filename("database", site_name)) + os.makedirs(sitelog_dir, exist_ok=True) +except Exception: + os.makedirs(f"{bench_sites_path}/{site_name}/logs", exist_ok=True) + +# 4. Populate site context flags manually before connecting +frappe.local.site = site_name +frappe.local.conf = frappe.get_site_config(site_name) +frappe.local.lang = "en" + +# 5. Connect to the underlying database pool +frappe.connect() + +# 6. FORCE-REGISTER APP MODULES (Fixes the RAG Settings Core Fallback Error) +installed_apps = frappe.get_installed_apps() +frappe.local.app_modules = {} +for app in installed_apps: + try: + frappe.local.app_modules[app] = importlib.import_module(app) + except ImportError: + continue + +# ── STEP 3: RUN THE QUEUE LISTENER ────────────────────────────────────────── from rag_service.utils.rabbitmq_consumer import RabbitMQConsumer -consumer = RabbitMQConsumer(debug=True) -if consumer.test_connection(): - print("Starting RabbitMQ consumer...") - consumer.start_consuming() \ No newline at end of file + +def run(): + """Main worker entry point""" + consumer = RabbitMQConsumer(debug=True) + if consumer.test_connection(): + print("Starting RabbitMQ consumer...") + consumer.start_consuming() + +if __name__ == "__main__": + run() diff --git a/rag_service/utils/gcp_service_client.py b/rag_service/utils/gcp_service_client.py index b67ecf1..73b76ef 100644 --- a/rag_service/utils/gcp_service_client.py +++ b/rag_service/utils/gcp_service_client.py @@ -4,32 +4,108 @@ import tempfile from pathlib import Path from typing import Dict, Optional, Tuple -from urllib.parse import urlparse, unquote +from urllib.parse import unquote, urlparse import frappe from google.cloud import storage from google.oauth2 import service_account +def is_gcs_url(url: str) -> bool: + """Check if a URL is a GCS URL.""" + if not url: + return False + return ( + url.startswith("gs://") + or url.startswith("https://storage.googleapis.com/") + or url.startswith("http://storage.googleapis.com/") + ) + + class GCPServiceClient: """Shared GCP client factory/helpers using credentials stored in Frappe.""" def __init__(self): + self.app_env = os.environ.get("APP_ENV", "production") settings = frappe.get_single("GCS Settings") raw_key = (settings.get("credentials_json") or "").strip() - if not raw_key: - raise ValueError("GCS Settings.credentials_json is required") - try: - self.key_data = json.loads(raw_key) - except json.JSONDecodeError as exc: - raise ValueError("GCS Settings.credentials_json must contain valid JSON") from exc - - self.project_id = settings.get("project_id") or self.key_data.get("project_id") - credentials = service_account.Credentials.from_service_account_info(self.key_data) - self.client = storage.Client(project=self.project_id, credentials=credentials) + # If credentials are missing/invalid, we only allow initialization + # if the later download_media call receives a non-GCS URL (in non-prod). + # We store the raw key and only raise ValueError if actually needed. + self.raw_key = raw_key + self.settings = settings + self.client = None + self.key_data = {} + + if raw_key: + try: + self.key_data = json.loads(raw_key) + self.project_id = settings.get("project_id") or self.key_data.get( + "project_id" + ) + credentials = service_account.Credentials.from_service_account_info( + self.key_data + ) + self.client = storage.Client( + project=self.project_id, credentials=credentials + ) + except Exception: + # We don't raise here yet; we'll raise in download_media if a GCS URL is used. + pass def download_media(self, media_url: str) -> Dict: + if not is_gcs_url(media_url): + # Fallback for regular HTTP/S URLs - ONLY ALLOWED IN NON-PRODUCTION + if self.app_env != "production" and media_url.startswith( + ("http://", "https://") + ): + print(f"NON-PROD FALLBACK: Downloading media from HTTP/S: {media_url}") + try: + from io import BytesIO + + import requests + + response = requests.get(media_url, timeout=30) + response.raise_for_status() + content = response.content + + # Create a temporary file to match the expected return structure + suffix = Path(urlparse(media_url).path).suffix or ".png" + with tempfile.NamedTemporaryFile( + delete=False, suffix=suffix + ) as temp_file: + temp_file.write(content) + temp_path = temp_file.name + + mime_type = ( + response.headers.get("Content-Type") + or mimetypes.guess_type(media_url)[0] + or "image/png" + ) + + return { + "bucket": "external-http", + "object_name": media_url, + "local_path": temp_path, + "mime_type": mime_type, + "content": content, + "filename": os.path.basename(urlparse(media_url).path) + or "downloaded_image", + } + except Exception as e: + raise RuntimeError(f"HTTP/S download failed: {str(e)}") + + # If it IS a GCS URL (or if fallback failed/is disallowed), we must enforce valid credentials + if not self.client: + if not self.raw_key: + raise ValueError( + f"GCS Settings.credentials_json is required for URL: {media_url}" + ) + raise ValueError( + f"GCS Settings.credentials_json must contain valid JSON for URL: {media_url}" + ) + bucket_name, object_name = self._parse_gcs_url(media_url) blob = self.client.bucket(bucket_name).blob(object_name) @@ -38,7 +114,11 @@ def download_media(self, media_url: str) -> Dict: temp_path = temp_file.name blob.download_to_filename(temp_path) - mime_type = blob.content_type or mimetypes.guess_type(object_name)[0] or "application/octet-stream" + mime_type = ( + blob.content_type + or mimetypes.guess_type(object_name)[0] + or "application/octet-stream" + ) with open(temp_path, "rb") as media_file: content = media_file.read() diff --git a/rag_service/utils/queue_manager.py b/rag_service/utils/queue_manager.py index 1ca4a9e..5fc2073 100644 --- a/rag_service/utils/queue_manager.py +++ b/rag_service/utils/queue_manager.py @@ -64,7 +64,13 @@ def send_feedback_to_tap(self, feedback_data: Dict) -> None: print(f"Queue: {self.settings.feedback_results_queue}") self.connect() - + + # Declare the queue to ensure it exists (idempotent — safe to call even if already declared) + self.channel.queue_declare( + queue=self.settings.feedback_results_queue, + durable=True + ) + # Add metadata to feedback message = { **feedback_data, diff --git a/requirements.txt b/requirements.txt index 6f8b9ec..6b21f4b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ pika -langchain_openai +langchain-openai aiohttp requests numpy opencv-python +together +google-cloud-aiplatform