diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/__init__.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/__init__.py new file mode 100644 index 000000000000..8b59d62cee0e --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/__init__.py @@ -0,0 +1,7 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from ._workflow_planning import _WorkflowPlanningEvaluator + +__all__ = ["_WorkflowPlanningEvaluator"] diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/_workflow_planning.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/_workflow_planning.py new file mode 100644 index 000000000000..e6e0ef6cc26c --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/_workflow_planning.py @@ -0,0 +1,171 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os +import json +import logging +from typing import Dict, Union, List + +from typing_extensions import overload, override + +from azure.ai.evaluation._exceptions import EvaluationException, ErrorBlame, ErrorCategory, ErrorTarget +from azure.ai.evaluation._evaluators._common import PromptyEvaluatorBase +from azure.ai.evaluation._common._experimental import experimental +from azure.ai.evaluation._workflows._utils import format_workflow_trace_for_eval + +logger = logging.getLogger(__name__) + +@experimental +class _WorkflowPlanningEvaluator(PromptyEvaluatorBase[Union[str, float]]): + """Evaluates whether a multi-agent workflow was well-planned and orchestrated to achieve the user's goal. + + Unlike single-agent evaluators that examine individual query-response pairs, this evaluator + is workflow-traces-native. It consumes the parsed trace of a workflow execution and evaluates + planning quality across four dimensions: + + - Task Decomposition (dynamic patterns only) + - Agent Selection & Routing (dynamic patterns only) + - Progress Tracking & Adaptation + - Error & Failure Handling + + Scoring is binary: + - 1 (pass): Workflow well-planned across all applicable dimensions + - 0 (fail): Material failure in one or more applicable dimensions + + :param model_config: Configuration for the Azure OpenAI model. + :type model_config: Union[~azure.ai.evaluation.AzureOpenAIModelConfiguration, + ~azure.ai.evaluation.OpenAIModelConfiguration] + """ + + _PROMPTY_FILE = "workflow_planning.prompty" + _RESULT_KEY = "workflow_planning" + _OPTIONAL_PARAMS: List[str] = [] + + id = "azureai://built-in/evaluators/workflow_planning" + """Evaluator identifier, experimental and to be used only with evaluation in cloud.""" + + @override + def __init__(self, model_config, *, credential=None, **kwargs): + current_dir = os.path.dirname(__file__) + prompty_path = os.path.join(current_dir, self._PROMPTY_FILE) + + super().__init__( + model_config=model_config, + prompty_file=prompty_path, + result_key=self._RESULT_KEY, + credential=credential, + **kwargs, + ) + + @overload + def __call__( + self, + *, + workflow_trace: Union[str, Dict], + ) -> Dict[str, Union[str, float]]: + """Evaluate workflow planning quality from a parsed workflow trace. + + :keyword workflow_trace: The parsed workflow trace (dict from converter output, or + JSON string). Must contain at minimum: agents, invocations, topology. + :paramtype workflow_trace: Union[str, Dict] + :return: Evaluation results with score, result, reason, and per-dimension details. + :rtype: Dict[str, Union[str, float]] + """ + + @override + def __call__(self, *args, **kwargs): # pylint: disable=docstring-missing-param + """Invokes the instance using the overloaded __call__ signature.""" + return super().__call__(*args, **kwargs) + + @override + def _convert_kwargs_to_eval_input(self, **kwargs): + """Pass workflow_trace through to _do_eval, even if None (so _do_eval can raise the correct error).""" + return [{"workflow_trace": kwargs.get("workflow_trace")}] + + @override + async def _do_eval(self, eval_input: Dict) -> Dict[str, Union[float, str]]: # type: ignore[override] + """Perform workflow planning evaluation.""" + workflow_trace = eval_input.get("workflow_trace") + if workflow_trace is None: + raise EvaluationException( + message="workflow_trace must be provided as input to the Workflow Planning evaluator.", + internal_message="workflow_trace must be provided.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.MISSING_FIELD, + target=ErrorTarget.EVALUATE, + ) + + # Normalize workflow_trace to dict + if isinstance(workflow_trace, str): + try: + workflow_trace = json.loads(workflow_trace) + except (json.JSONDecodeError, ValueError) as exc: + raise EvaluationException( + message="workflow_trace must be a valid JSON string or dict.", + internal_message=f"JSON parse failed: {exc}", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=ErrorTarget.EVALUATE, + ) from exc + + if not isinstance(workflow_trace, dict): + raise EvaluationException( + message="workflow_trace must be a dict or JSON string.", + internal_message=f"Got type {type(workflow_trace).__name__}", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=ErrorTarget.EVALUATE, + ) + + workflow_errors = workflow_trace.get("errors", []) + if workflow_errors: + raise EvaluationException( + message=f"Workflow ended with errors: {workflow_errors}. Workflow Planning evaluation is not applicable.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.NOT_APPLICABLE, + target=ErrorTarget.EVALUATE, + ) + + # Check parse_failed flag + if workflow_trace.get("parse_failed"): + logger.warning("Workflow trace has parse_failed=True; evaluation may be limited.") + + # Format trace into LLM-readable text (warnings logged internally) + trace_str = format_workflow_trace_for_eval(workflow_trace) + + prompty_input = { + "workflow_trace": trace_str, + } + + prompty_output_dict = await self._flow(timeout=self._LLM_CALL_TIMEOUT, **prompty_input) + llm_output = prompty_output_dict.get("llm_output", {}) + + if isinstance(llm_output, dict): + success_value = llm_output.get("success", False) + if isinstance(success_value, str): + success = 1 if success_value.lower() == "true" else 0 + else: + success = 1 if success_value else 0 + success_result = "pass" if success == 1 else "fail" + reason = llm_output.get("explanation", "") + return { + f"{self._result_key}": success, + f"{self._result_key}_result": success_result, + f"{self._result_key}_reason": reason, + f"{self._result_key}_details": llm_output.get("details", ""), + f"{self._result_key}_prompt_tokens": prompty_output_dict.get("input_token_count", 0), + f"{self._result_key}_completion_tokens": prompty_output_dict.get("output_token_count", 0), + f"{self._result_key}_total_tokens": prompty_output_dict.get("total_token_count", 0), + f"{self._result_key}_finish_reason": prompty_output_dict.get("finish_reason", ""), + f"{self._result_key}_model": prompty_output_dict.get("model_id", ""), + f"{self._result_key}_sample_input": prompty_output_dict.get("sample_input", ""), + f"{self._result_key}_sample_output": prompty_output_dict.get("sample_output", ""), + } + + else: + raise EvaluationException( + message="Evaluator returned invalid output.", + blame=ErrorBlame.SYSTEM_ERROR, + category=ErrorCategory.FAILED_EXECUTION, + target=ErrorTarget.EVALUATE, + ) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/bugbash_workflow_planning.ipynb b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/bugbash_workflow_planning.ipynb new file mode 100644 index 000000000000..fea93d37a1d2 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/bugbash_workflow_planning.ipynb @@ -0,0 +1,612 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Workflow Planning Evaluator — Bug Bash Notebook\n", + "\n", + "This notebook walks you through:\n", + "1. Installing required packages\n", + "2. Setting up tracing with Application Insights\n", + "3. Running 3 multi-agent HR recruitment workflows:\n", + " - **Group Chat** with agent manager (orchestrator selects speakers)\n", + " - **Magentic** multi-source sourcing with compliance\n", + " - **Sequential + Human-in-the-Loop** hiring pipeline\n", + "4. Fetching traces from App Insights and converting them\n", + "5. Running the Workflow Planning Evaluator on each trace\n", + "\n", + "**Prerequisites:**\n", + "- Azure AI project with Application Insights attached\n", + "- Run `az login` before starting\n", + "- Set your environment variables in the cell below" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install Packages" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`pip install agent-framework==1.0.0b251211 agent-framework-azure-ai==1.0.0rc3 azure-ai-projects==2.0.0b4`\n", + "\n", + "`pip install -e ../../../../../../..[redteam]`\n", + "\n", + "`pip install azure-monitor-opentelemetry azure-monitor-query azure-identity opentelemetry-api python-dotenv`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Environment Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from dotenv import load_dotenv\n", + "\n", + "load_dotenv()\n", + "\n", + "# ── Set these environment variables (or use a .env file) ──\n", + "# os.environ[\"APPLICATIONINSIGHTS_CONNECTION_STRING\"] = \"\"\n", + "# os.environ[\"APP_INSIGHTS_WORKSPACE_ID\"] = \"\"\n", + "# os.environ[\"AZURE_AI_PROJECT_ENDPOINT\"] = \"\"\n", + "# os.environ[\"AZURE_AI_MODEL_DEPLOYMENT_NAME\"] = \"\"\n", + "\n", + "# For evaluation\n", + "# os.environ[\"EVAL_GPT4_1_ENDPOINT\"] = \"\"\n", + "# os.environ[\"EVAL_GPT4_1_API_KEY\"] = \"\"\n", + "# os.environ[\"EVAL_GPT4_1_API_VERSION\"] = \"\"\n", + "# os.environ[\"EVAL_GPT4_1_DEPLOYMENT_NAME\"] = \"\"\n", + "\n", + "print(\"Environment variables loaded successfully.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Tracing Setup\n", + "\n", + "Configure Azure Monitor telemetry and enable instrumentation with sensitive data capture." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from agent_framework.observability import enable_instrumentation, get_tracer\n", + "from azure.monitor.opentelemetry import configure_azure_monitor\n", + "from opentelemetry.trace import SpanKind\n", + "from opentelemetry.trace.span import format_trace_id\n", + "\n", + "# Configure Azure Monitor exporter (uses APPLICATIONINSIGHTS_CONNECTION_STRING from .env)\n", + "configure_azure_monitor(\n", + " connection_string=os.environ.get(\"APPLICATIONINSIGHTS_CONNECTION_STRING\"),\n", + " enable_live_metrics=True,\n", + ")\n", + "enable_instrumentation(enable_sensitive_data=True)\n", + "\n", + "tracer = get_tracer()\n", + "print(\"Tracing configured with App Insights + sensitive data enabled!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# We'll collect trace IDs from each workflow run\n", + "collected_trace_ids = {}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Import HR Agents & Tools\n", + "\n", + "The HR mock data, function tools, and agent factories are defined in `scripts/hr_tools.py` and `scripts/hr_agents.py`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from scripts.hr_agents import (\n", + " cleanup_clients,\n", + " create_compliance_guard,\n", + " create_evaluator,\n", + " create_magentic_manager,\n", + " create_mobility_scout,\n", + " create_orchestrator,\n", + " create_req_master,\n", + " create_scheduler,\n", + " create_talent_scout,\n", + ")\n", + "\n", + "print(\"HR agents and tools loaded!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Workflow 1: Group Chat with Agent Manager\n", + "\n", + "An HR group chat where an **Orchestrator agent** decides which specialist speaks next.\n", + "The team works together to find candidates for a Senior Software Engineer position.\n", + "\n", + "**Feel free to modify the agents, task, and termination condition!**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import cast\n", + "from agent_framework import AgentResponseUpdate, Message, WorkflowEvent\n", + "from agent_framework.orchestrations import GroupChatBuilder\n", + "\n", + "# ── Create HR agents (each gets its own AzureAIClient) ──\n", + "orchestrator = await create_orchestrator()\n", + "req_master = await create_req_master()\n", + "talent_scout = await create_talent_scout()\n", + "evaluator = await create_evaluator()\n", + "\n", + "# ── Run with tracing (build + run inside the same span so workflow.build is captured) ──\n", + "task = (\n", + " \"Help me fill the Senior Software Engineer position JOB-SWE-2025-001. \"\n", + " \"Find candidates, evaluate them, and recommend a shortlist.\"\n", + ")\n", + "\n", + "print(f\"\\nTask: {task}\\n\" + \"=\" * 80)\n", + "\n", + "with tracer.start_as_current_span(\"group_chat_workflow\", kind=SpanKind.CLIENT) as span:\n", + " trace_id = format_trace_id(span.get_span_context().trace_id)\n", + " collected_trace_ids[\"group_chat\"] = trace_id\n", + " print(f\"Trace ID: {trace_id}\\n\")\n", + "\n", + " # Build the group chat workflow\n", + " group_chat_workflow = (\n", + " GroupChatBuilder(\n", + " participants=[req_master, talent_scout, evaluator],\n", + " max_rounds=20,\n", + " intermediate_outputs=True,\n", + " orchestrator_agent=orchestrator,\n", + " )\n", + " .build()\n", + " )\n", + "\n", + " last_response_id = None\n", + " async for event in group_chat_workflow.run(task, stream=True):\n", + " if event.type == \"output\" and not isinstance(event.data, AgentResponseUpdate):\n", + " outputs = cast(list[Message], event.data)\n", + " print(\"\\n\" + \"=\" * 80)\n", + " print(\"\\nFinal Conversation:\")\n", + " for message in outputs:\n", + " print(f\" [AGENT {message.author_name or message.role}]: {message.text[:200]}...\\n\")\n", + "\n", + "print(f\"\\n✅ Group Chat complete! Trace ID: {collected_trace_ids['group_chat']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Workflow 2: Magentic Multi-Source Sourcing\n", + "\n", + "A Magentic orchestration where a manager coordinates multiple specialists to source\n", + "candidates from all channels, evaluate them, and run compliance checks.\n", + "\n", + "**Feel free to modify the agents, task, and configuration!**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from agent_framework.orchestrations import MagenticBuilder\n", + "\n", + "# ── Create HR agents (each gets its own AzureAIClient) ──\n", + "magentic_manager = await create_magentic_manager()\n", + "mag_talent_scout = await create_talent_scout()\n", + "mag_mobility_scout = await create_mobility_scout()\n", + "mag_evaluator = await create_evaluator()\n", + "mag_compliance = await create_compliance_guard()\n", + "\n", + "# ── Run with tracing (build + run inside the same span so workflow.build is captured) ──\n", + "task = (\n", + " \"We need to hire for JOB-SWE-2025-001 (Senior Software Engineer). \"\n", + " \"Source candidates from all channels (external job boards and internal transfers), \"\n", + " \"evaluate them, and run a compliance check on the final shortlist.\"\n", + ")\n", + "\n", + "print(f\"\\nTask: {task}\\n\" + \"=\" * 80)\n", + "\n", + "with tracer.start_as_current_span(\"magentic_workflow\", kind=SpanKind.CLIENT) as span:\n", + " trace_id = format_trace_id(span.get_span_context().trace_id)\n", + " collected_trace_ids[\"magentic\"] = trace_id\n", + " print(f\"Trace ID: {trace_id}\\n\")\n", + "\n", + " # Build the Magentic workflow\n", + " magentic_workflow = MagenticBuilder(\n", + " participants=[mag_talent_scout, mag_mobility_scout, mag_evaluator, mag_compliance],\n", + " intermediate_outputs=True,\n", + " manager_agent=magentic_manager,\n", + " max_round_count=15,\n", + " ).build()\n", + "\n", + " last_response_id = None\n", + " output_event = None\n", + " async for event in magentic_workflow.run(task, stream=True):\n", + " if event.type == \"output\" and not isinstance(event.data, AgentResponseUpdate) and not event.type == \"magentic_orchestrator\":\n", + " output_event = event\n", + "\n", + " if output_event:\n", + " outputs = cast(list[Message], output_event.data)\n", + " print(\"\\n\" + \"=\" * 80)\n", + " print(\"\\nFinal Conversation:\")\n", + " for message in outputs:\n", + " print(f\" {message.author_name or message.role}: {message.text[:200]}...\\n\")\n", + "\n", + "print(f\"\\n✅ Magentic complete! Trace ID: {collected_trace_ids['magentic']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Workflow 3: Sequential HR Pipeline with Human-in-the-Loop\n", + "\n", + "A sequential hiring pipeline: ReqMaster \\u2192 TalentScout \\u2192 Evaluator \\u2192 Scheduler.\n", + "The workflow **pauses after the Evaluator** so you can review the shortlist before\n", + "scheduling interviews.\n", + "\n", + "**When prompted, provide feedback or type 'skip' to approve the shortlist.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections.abc import AsyncIterable\n", + "from agent_framework import AgentExecutorResponse\n", + "from agent_framework.orchestrations import AgentRequestInfoResponse, SequentialBuilder\n", + "\n", + "\n", + "async def process_event_stream(stream: AsyncIterable[WorkflowEvent]):\n", + " \"\"\"Process events from the workflow stream, handling HITL requests.\"\"\"\n", + " requests = {}\n", + " async for event in stream:\n", + " if event.type == \"request_info\" and isinstance(event.data, AgentExecutorResponse):\n", + " requests[event.request_id] = event.data\n", + " elif event.type == \"output\":\n", + " print(\"\\n\" + \"=\" * 60)\n", + " print(\"WORKFLOW COMPLETE\")\n", + " print(\"=\" * 60)\n", + " outputs = cast(list[Message], event.data)\n", + " for message in outputs:\n", + " print(f\" [{message.author_name or message.role}]: {message.text[:300]}...\")\n", + "\n", + " responses = {}\n", + " if requests:\n", + " for request_id, request in requests.items():\n", + " print(\"\\n\" + \"-\" * 40)\n", + " print(f\"HUMAN REVIEW REQUESTED\")\n", + " print(f\"Agent '{request.executor_id}' produced a shortlist:\")\n", + " print(f\" {request.agent_response.text[:500]}...\")\n", + " print(\"-\" * 40)\n", + "\n", + " user_input = input(\"Your feedback (or 'skip' to approve): \")\n", + " if user_input.lower() == \"skip\":\n", + " responses[request_id] = AgentRequestInfoResponse.approve()\n", + " else:\n", + " responses[request_id] = AgentRequestInfoResponse.from_strings([user_input])\n", + "\n", + " return responses if responses else None\n", + "\n", + "\n", + "# ── Create HR agents (each gets its own AzureAIClient) ──\n", + "seq_req_master = await create_req_master()\n", + "seq_talent_scout = await create_talent_scout()\n", + "seq_evaluator = await create_evaluator()\n", + "seq_scheduler = await create_scheduler()\n", + "\n", + "# ── Run with tracing (build + run inside the same span so workflow.build is captured) ──\n", + "task = (\n", + " \"Process the hiring pipeline for JOB-SWE-2025-001: gather requirements, \"\n", + " \"source candidates, evaluate them (I'll review the shortlist), \"\n", + " \"then schedule interviews for approved candidates.\"\n", + ")\n", + "\n", + "print(f\"\\nTask: {task}\\n\" + \"=\" * 80)\n", + "\n", + "with tracer.start_as_current_span(\"sequential_hitl_workflow\", kind=SpanKind.CLIENT) as span:\n", + " trace_id = format_trace_id(span.get_span_context().trace_id)\n", + " collected_trace_ids[\"sequential_hitl\"] = trace_id\n", + " print(f\"Trace ID: {trace_id}\\n\")\n", + "\n", + " # Build the sequential workflow with HITL\n", + " sequential_workflow = (\n", + " SequentialBuilder(participants=[seq_req_master, seq_talent_scout, seq_evaluator, seq_scheduler])\n", + " .with_request_info(agents=[\"Evaluator\"]) # Pause after evaluator for human review\n", + " .build()\n", + " )\n", + "\n", + " stream = sequential_workflow.run(task, stream=True)\n", + " pending_responses = await process_event_stream(stream)\n", + " while pending_responses is not None:\n", + " stream = sequential_workflow.run(stream=True, responses=pending_responses)\n", + " pending_responses = await process_event_stream(stream)\n", + "\n", + "print(f\"\\n✅ Sequential HITL complete! Trace ID: {collected_trace_ids['sequential_hitl']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Summary of Collected Trace IDs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Collected Trace IDs:\")\n", + "for name, tid in collected_trace_ids.items():\n", + " print(f\" {name}: {tid}\")\n", + "\n", + "all_trace_ids = list(collected_trace_ids.values())\n", + "print(f\"\\nTotal: {len(all_trace_ids)} traces\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 9. Wait for App Insights Ingestion\n", + "\n", + "Application Insights has ~90 second ingestion latency. We wait before querying." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "WAIT_SECONDS = 90\n", + "print(f\"Waiting {WAIT_SECONDS} seconds for App Insights ingestion...\")\n", + "for i in range(WAIT_SECONDS, 0, -10):\n", + " print(f\" {i}s remaining...\", flush=True)\n", + " time.sleep(min(10, i))\n", + "print(\"Done waiting!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 10. Fetch & Convert Traces\n", + "\n", + "Query App Insights for each trace ID, then convert to the structured format expected by the evaluator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.ai.evaluation._workflows.trace_query import query_traces, process_workflow_trace_rows\n", + "from azure.ai.evaluation._workflows.workflow_trace_converter import convert_workflow_traces\n", + "\n", + "workspace_id = os.environ[\"APP_INSIGHTS_WORKSPACE_ID\"]\n", + "converted_traces = {}\n", + "\n", + "for workflow_name, trace_id in collected_trace_ids.items():\n", + " print(f\"\\n{'=' * 60}\")\n", + " print(f\"Fetching traces for: {workflow_name} (trace_id: {trace_id})\")\n", + " print(f\"{'=' * 60}\")\n", + "\n", + " try:\n", + " raw_rows = query_traces(\n", + " workspace_id=workspace_id,\n", + " trace_ids=[trace_id],\n", + " lookback_hours=1,\n", + " )\n", + " print(f\" Retrieved {len(raw_rows)} raw rows\")\n", + "\n", + " spans = process_workflow_trace_rows(raw_rows)\n", + " print(f\" Processed into {len(spans)} spans\")\n", + "\n", + " converted = convert_workflow_traces(spans)\n", + " converted_traces[workflow_name] = converted\n", + "\n", + " print(f\" Agents: {list(converted.get('agents', {}).keys())}\")\n", + " print(f\" Invocations: {len(converted.get('invocations', []))}\")\n", + " print(f\" Parse failed: {converted.get('parse_failed', False)}\")\n", + " print(f\" \\u2705 Conversion successful!\")\n", + " except Exception as e:\n", + " print(f\" \\u274c Error: {e}\")\n", + " converted_traces[workflow_name] = None\n", + "\n", + "print(f\"\\n\\nSuccessfully converted: {sum(1 for v in converted_traces.values() if v is not None)}/{len(converted_traces)}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 11. (Optional) Inspect Converted and Reformatted Traces\n", + "\n", + "View the converted trace structure for debugging.\n", + "\n", + "First cells shows the converted traces.\n", + "\n", + "Second cell shows the reformatted traces (direct input to the evaluator)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "\n", + "for workflow_name, trace_data in converted_traces.items():\n", + " if trace_data is None:\n", + " continue\n", + " print(f\"\\n{'=' * 60}\")\n", + " print(f\"{workflow_name} \\u2014 Converted Trace\")\n", + " print(f\"{'=' * 60}\")\n", + " print(json.dumps(trace_data, indent=2, default=str)[:3000])\n", + " print(\"...\" * 60)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.ai.evaluation._workflows._utils import format_workflow_trace_for_eval\n", + "\n", + "# Show evaluator-side reformatted trace text\n", + "workflow_name = \"group_chat\"\n", + "# workflow_name = \"magentic\"\n", + "# workflow_name = \"sequential_hitl\"\n", + "\n", + "trace_data = converted_traces.get(workflow_name)\n", + "\n", + "\n", + "\n", + "if trace_data is None:\n", + " print(f\"No converted trace found for '{workflow_name}'.\")\n", + "else:\n", + " reformatted = format_workflow_trace_for_eval(trace_data)\n", + " print(f\"Reformatted trace preview for '{workflow_name}':\")\n", + " print(\"=\" * 80)\n", + " print(reformatted)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 12. Run Workflow Planning Evaluator\n", + "\n", + "Run the evaluator on each converted trace and display results." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.ai.evaluation import AzureOpenAIModelConfiguration\n", + "from azure.ai.evaluation._evaluators._workflow_planning import _WorkflowPlanningEvaluator\n", + "\n", + "# Configure the model for the evaluator\n", + "model_config = AzureOpenAIModelConfiguration(\n", + " azure_endpoint=os.environ[\"EVAL_GPT4_1_ENDPOINT\"],\n", + " api_key=os.environ[\"EVAL_GPT4_1_API_KEY\"],\n", + " api_version=os.environ[\"EVAL_GPT4_1_API_VERSION\"],\n", + " azure_deployment=os.environ[\"EVAL_GPT4_1_DEPLOYMENT_NAME\"],\n", + ")\n", + "\n", + "evaluator = _WorkflowPlanningEvaluator(model_config=model_config)\n", + "\n", + "results = {}\n", + "for workflow_name, trace_data in converted_traces.items():\n", + " if trace_data is None:\n", + " print(f\"\\n\\u23ed\\ufe0f Skipping {workflow_name} (no trace data)\")\n", + " continue\n", + "\n", + " print(f\"\\n{'=' * 60}\")\n", + " print(f\"Evaluating: {workflow_name}\")\n", + " print(f\"{'=' * 60}\")\n", + "\n", + " try:\n", + " result = evaluator(workflow_trace=trace_data)\n", + " results[workflow_name] = result\n", + "\n", + " print(f\" Score: {result.get('workflow_planning', 'N/A')}\")\n", + " print(f\" Result: {result.get('workflow_planning_result', 'N/A')}\")\n", + " print(f\" Reason: {result.get('workflow_planning_reason', 'N/A')}\")\n", + " print(f\" \\u2705 Evaluation complete!\")\n", + " except Exception as e:\n", + " print(f\" \\u274c Error: {e}\")\n", + " results[workflow_name] = {\"error\": str(e)}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await cleanup_clients()\n", + "print(\"Cleaned up all agent clients!\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".bugbash-venv (3.12.10)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/scripts/hr_agents.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/scripts/hr_agents.py new file mode 100644 index 000000000000..b4b9b81addfd --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/scripts/hr_agents.py @@ -0,0 +1,288 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""HR recruitment agent factory functions and system instructions for the bug-bash notebook. + +Each factory function creates an agent with appropriate instructions and tools. +Agents are designed for an HR recruitment workflow scenario. +""" + +import os + +from agent_framework import Agent +from agent_framework.azure import AzureAIClient +from azure.identity import DefaultAzureCredential + +from .hr_tools import ( + book_interview, + check_compliance, + detect_bias, + extract_requirements, + get_calendar_availability, + get_job_details, + query_external_candidates, + query_internal_employees, + rank_candidates, + score_candidate, +) + +# ============================================================================ +# System Instructions +# ============================================================================ + +REQ_MASTER_INSTRUCTIONS = """You are a Requisition Master agent handling job intake and requirements analysis. + +Your responsibilities: +1. Retrieve and validate job posting details +2. Extract structured requirements (must-have vs nice-to-have) +3. Verify job is approved and open for sourcing +4. Identify hiring manager and team context +5. Set up screening criteria for candidate matching + +When processing job requisitions: +- Validate the job ID exists and is in 'open' status +- Extract all required and preferred skills +- Note salary range and location requirements +- Identify urgency level +- Flag any special requirements + +Output format: Provide structured job requirements ready for candidate sourcing.""" + +TALENT_SCOUT_INSTRUCTIONS = """You are a Talent Scout agent specializing in external candidate sourcing. + +Your responsibilities: +1. Query external job boards for applicants +2. Screen candidates against job requirements +3. Filter based on experience, skills, and location +4. Handle visa sponsorship considerations + +When processing external candidates: +- Verify all required skills are present +- Note any visa sponsorship requirements +- Check salary expectations against job budget +- Flag candidates with relevant industry experience +- Prioritize candidates with shorter notice periods + +Output format: Provide structured candidate profiles with skills match analysis.""" + +MOBILITY_SCOUT_INSTRUCTIONS = """You are an Internal Mobility Scout agent specializing in internal transfers. + +Your responsibilities: +1. Query the HRIS system for employees interested in the role +2. Verify transfer eligibility (tenure, performance, no PIP) +3. Assess skills match with current role experience +4. Coordinate with HR on internal transfer policies + +When processing internal candidates: +- Verify minimum tenure requirement (usually 1+ year) +- Check performance rating (must be meeting expectations or above) +- Confirm employee is not on a Performance Improvement Plan +- Note current team and potential backfill needs + +Output format: Provide structured profiles with eligibility status and current role context.""" + +EVALUATOR_INSTRUCTIONS = """You are an Evaluator agent responsible for candidate assessment. + +Your responsibilities: +1. Score candidates against job requirements +2. Rank candidates by overall fit score +3. Identify skill gaps and development needs +4. Compare salary expectations to budget +5. Produce a ranked shortlist with justification + +When evaluating candidates: +- Weight required skills higher than preferred +- Consider years of experience relative to role level +- Factor in location and remote work compatibility +- Note any red flags or concerns +- Provide clear rationale for rankings + +Output format: Provide ranked shortlist with scores, evaluation notes, and recommendations.""" + +COMPLIANCE_GUARD_INSTRUCTIONS = """You are a Compliance Guard agent ensuring fair hiring practices. + +Your responsibilities: +1. Analyze shortlists for adverse impact using the 4/5ths rule +2. Check demographic distribution of selections +3. Verify selection criteria are job-related +4. Flag potential bias in screening decisions +5. Ensure EEOC compliance documentation + +When reviewing for compliance: +- Always call the check_compliance tool before drafting your final response +- Also run detect_bias to check for patterns +- Calculate selection rates by demographic group +- Flag any disparate impact concerns +- Recommend adjustments if bias is detected + +Output format: Provide compliance report with pass/fail status and recommendations.""" + +SCHEDULER_INSTRUCTIONS = """You are a Scheduler agent coordinating interviews. + +Your responsibilities: +1. Check calendar availability for hiring managers +2. Find suitable interview slots for candidates +3. Book interview rooms or video conferences +4. Handle scheduling logistics + +When scheduling interviews: +- Check interviewer availability first +- Allow buffer time between interviews +- Consider time zones for remote candidates +- Include video conference links for virtual interviews + +Output format: Provide booking confirmation with time, participants, and meeting link.""" + +ORCHESTRATOR_INSTRUCTIONS = """You coordinate an HR recruitment team conversation to fulfill hiring requests. + +Guidelines: +- Start with ReqMaster to gather job requirements +- Then have TalentScout source external candidates +- Use Evaluator to score and rank candidates +- Only finish after a shortlist has been produced with clear recommendations +- Keep the conversation focused on the hiring task + +IMPORTANT: Do not choose the same agent twice in a row. +""" + +MAGENTIC_MANAGER_INSTRUCTIONS = """You coordinate an HR recruitment team to complete hiring tasks efficiently. + +Your team members: +- TalentScout: Sources external candidates from job boards +- MobilityScout: Finds internal transfer candidates from HRIS +- Evaluator: Scores and ranks all candidates +- ComplianceGuard: Checks shortlists for fairness and compliance + +Strategy: +1. First, have TalentScout and MobilityScout source candidates in parallel +2. Then have Evaluator score and rank all candidates together +3. Finally, have ComplianceGuard review the shortlist for compliance +4. End when you have a compliant, ranked shortlist ready for interviews +""" + + +# ============================================================================ +# Agent Factory Functions +# ============================================================================ + +# Track all created clients for cleanup +_created_clients: list[AzureAIClient] = [] + + +async def _create_client(agent_name: str) -> AzureAIClient: + """Create a new independent AzureAIClient instance for an agent. + + Each agent gets its own client to ensure proper message isolation + in group chat and other multi-agent workflows. + """ + client = AzureAIClient( + project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"], + model_deployment_name=os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME"), + credential=DefaultAzureCredential(), + agent_name=agent_name, + ) + await client.__aenter__() + _created_clients.append(client) + return client + + +async def cleanup_clients(): + """Close all created clients. Call this when done with the agents.""" + for client in _created_clients: + if hasattr(client, "__aexit__"): + await client.__aexit__(None, None, None) + _created_clients.clear() + + +async def create_req_master() -> Agent: + """Create a Requisition Master agent.""" + client = await _create_client("ReqMaster") + return Agent( + name="ReqMaster", + description="Extracts requisition requirements and hiring constraints from job postings.", + instructions=REQ_MASTER_INSTRUCTIONS, + client=client, + tools=[get_job_details, extract_requirements], + ) + + +async def create_talent_scout() -> Agent: + """Create a Talent Scout agent for external candidate sourcing.""" + client = await _create_client("TalentScout") + return Agent( + name="TalentScout", + description="Sources external candidates from job boards.", + instructions=TALENT_SCOUT_INSTRUCTIONS, + client=client, + tools=[query_external_candidates], + ) + + +async def create_mobility_scout() -> Agent: + """Create an Internal Mobility Scout agent.""" + client = await _create_client("MobilityScout") + return Agent( + name="MobilityScout", + description="Finds eligible internal transfer candidates from HRIS systems.", + instructions=MOBILITY_SCOUT_INSTRUCTIONS, + client=client, + tools=[query_internal_employees], + ) + + +async def create_evaluator() -> Agent: + """Create an Evaluator agent for candidate scoring and ranking.""" + client = await _create_client("Evaluator") + return Agent( + name="Evaluator", + description="Scores, ranks, and compares candidate fit against role requirements.", + instructions=EVALUATOR_INSTRUCTIONS, + client=client, + tools=[score_candidate, rank_candidates], + ) + + +async def create_compliance_guard() -> Agent: + """Create a Compliance Guard agent for bias and EEOC compliance.""" + client = await _create_client("ComplianceGuard") + return Agent( + name="ComplianceGuard", + description="Performs fairness and policy compliance checks on shortlists.", + instructions=COMPLIANCE_GUARD_INSTRUCTIONS, + client=client, + tools=[check_compliance, detect_bias], + ) + + +async def create_scheduler() -> Agent: + """Create a Scheduler agent for interview coordination.""" + client = await _create_client("Scheduler") + return Agent( + name="Scheduler", + description="Coordinates interview slots and confirms interview logistics.", + instructions=SCHEDULER_INSTRUCTIONS, + client=client, + tools=[get_calendar_availability, book_interview], + ) + + +async def create_orchestrator() -> Agent: + """Create an Orchestrator agent for group chat management.""" + client = await _create_client("Orchestrator") + return Agent( + name="Orchestrator", + description="Coordinates multi-agent HR recruitment by selecting speakers.", + instructions=ORCHESTRATOR_INSTRUCTIONS, + client=client, + ) + + +async def create_magentic_manager() -> Agent: + """Create a Manager agent for Magentic orchestration.""" + client = await _create_client("HiringManager") + return Agent( + name="HiringManager", + description="Coordinates the HR recruitment workflow across multiple specialists.", + instructions=MAGENTIC_MANAGER_INSTRUCTIONS, + client=client, + ) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/scripts/hr_tools.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/scripts/hr_tools.py new file mode 100644 index 000000000000..bfb25c8737bf --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/scripts/hr_tools.py @@ -0,0 +1,500 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Mock HR data and function tools for the bug-bash notebook. + +Provides a self-contained HR recruitment scenario with: +- Mock data: job postings, candidates (external + internal), calendar slots +- Function tools for job lookup, candidate sourcing, evaluation, compliance, + scheduling, and communication +""" + +from typing import Annotated + +from agent_framework import tool + +# ============================================================================ +# Mock Data +# ============================================================================ + +JOBS = { + "JOB-SWE-2025-001": { + "title": "Senior Software Engineer", + "department": "Engineering", + "location": "Seattle, WA", + "remote_allowed": True, + "salary_range": (150000, 200000), + "status": "open", + "hiring_manager": "EMP-MGR-100", + "urgency": "high", + "description": "Build and maintain cloud-native microservices for our AI platform.", + "requirements": { + "must_have": ["Python", "Distributed Systems", "Cloud (Azure/AWS)", "5+ years experience"], + "nice_to_have": ["Kubernetes", "Machine Learning", "Go", "System Design"], + }, + }, + "JOB-PM-2025-042": { + "title": "Product Manager - AI Platform", + "department": "Product", + "location": "San Francisco, CA", + "remote_allowed": True, + "salary_range": (140000, 180000), + "status": "open", + "hiring_manager": "EMP-MGR-200", + "urgency": "medium", + "description": "Define product strategy and roadmap for our AI developer tools.", + "requirements": { + "must_have": ["Product Management", "AI/ML domain knowledge", "3+ years experience"], + "nice_to_have": ["Technical background", "B2B SaaS", "Developer tools"], + }, + }, + "JOB-DATA-2025-015": { + "title": "Data Engineer", + "department": "Data", + "location": "Austin, TX", + "remote_allowed": False, + "salary_range": (130000, 170000), + "status": "open", + "hiring_manager": "EMP-MGR-300", + "urgency": "medium", + "description": "Design and build data pipelines for large-scale ML training datasets.", + "requirements": { + "must_have": ["Python", "SQL", "Spark/Databricks", "3+ years experience"], + "nice_to_have": ["Azure Data Factory", "dbt", "Airflow"], + }, + }, +} + +EXTERNAL_CANDIDATES = { + "CAND-EXT-1001": { + "name": "Alice Zhang", + "email": "alice.zhang@email.com", + "skills": ["Python", "Distributed Systems", "Kubernetes", "Azure", "Machine Learning"], + "years_experience": 7, + "education": "MS Computer Science, Stanford", + "current_company": "TechCorp", + "current_title": "Software Engineer II", + "location": "Seattle, WA", + "visa_required": False, + "salary_expectation": 185000, + "applied_jobs": ["JOB-SWE-2025-001"], + }, + "CAND-EXT-1002": { + "name": "Bob Patel", + "email": "bob.patel@email.com", + "skills": ["Python", "Cloud (Azure/AWS)", "Go", "System Design", "CI/CD"], + "years_experience": 9, + "education": "BS Computer Engineering, MIT", + "current_company": "CloudScale Inc", + "current_title": "Senior Backend Engineer", + "location": "San Francisco, CA", + "visa_required": False, + "salary_expectation": 195000, + "applied_jobs": ["JOB-SWE-2025-001"], + }, + "CAND-EXT-1003": { + "name": "Clara Kim", + "email": "clara.kim@email.com", + "skills": ["Python", "Distributed Systems", "Azure", "Microservices"], + "years_experience": 5, + "education": "MS Software Engineering, Carnegie Mellon", + "current_company": "DataFlow", + "current_title": "Software Engineer", + "location": "Remote", + "visa_required": True, + "salary_expectation": 165000, + "applied_jobs": ["JOB-SWE-2025-001"], + }, + "CAND-EXT-1004": { + "name": "Daniel Okafor", + "email": "daniel.okafor@email.com", + "skills": ["Java", "Python", "AWS", "Kafka", "Spark"], + "years_experience": 6, + "education": "BS Computer Science, Georgia Tech", + "current_company": "StreamTech", + "current_title": "Platform Engineer", + "location": "Austin, TX", + "visa_required": False, + "salary_expectation": 170000, + "applied_jobs": ["JOB-SWE-2025-001", "JOB-DATA-2025-015"], + }, + "CAND-EXT-1005": { + "name": "Eva Martinez", + "email": "eva.martinez@email.com", + "skills": ["Python", "SQL", "Spark/Databricks", "Azure Data Factory", "dbt"], + "years_experience": 4, + "education": "MS Data Science, UT Austin", + "current_company": "DataWorks", + "current_title": "Data Engineer", + "location": "Austin, TX", + "visa_required": False, + "salary_expectation": 155000, + "applied_jobs": ["JOB-DATA-2025-015"], + }, +} + +INTERNAL_CANDIDATES = { + "CAND-INT-2001": { + "name": "Frank Liu", + "employee_id": "EMP-4001", + "skills": ["Python", "Azure", "Distributed Systems", "Machine Learning", "Kubernetes"], + "years_experience": 8, + "education": "PhD Computer Science, UC Berkeley", + "current_title": "Software Engineer II", + "department": "ML Platform", + "location": "Seattle, WA", + "tenure_years": 3.5, + "performance_rating": 4.2, + "transfer_eligible": True, + "on_pip": False, + "applied_jobs": ["JOB-SWE-2025-001"], + }, + "CAND-INT-2002": { + "name": "Grace Thompson", + "employee_id": "EMP-4002", + "skills": ["Python", "SQL", "Cloud (Azure/AWS)", "System Design"], + "years_experience": 5, + "education": "MS Computer Science, University of Washington", + "current_title": "Software Engineer", + "department": "Backend Services", + "location": "Seattle, WA", + "tenure_years": 2.0, + "performance_rating": 3.8, + "transfer_eligible": True, + "on_pip": False, + "applied_jobs": ["JOB-SWE-2025-001"], + }, +} + +CALENDAR_SLOTS = { + "EMP-MGR-100": [ + {"date": "2025-03-10", "time": "10:00 AM", "available": True}, + {"date": "2025-03-10", "time": "2:00 PM", "available": True}, + {"date": "2025-03-11", "time": "9:00 AM", "available": False}, + {"date": "2025-03-11", "time": "3:00 PM", "available": True}, + {"date": "2025-03-12", "time": "11:00 AM", "available": True}, + ], +} + +# Track booked interviews +_booked_interviews = [] + + +# ============================================================================ +# Job Requisition Tools +# ============================================================================ + + +@tool( + name="get_job_details", + description="Retrieves detailed information about a job posting including title, department, requirements, salary range, and hiring manager.", + approval_mode="never_require", +) +def get_job_details( + job_id: Annotated[str, "The job ID to retrieve (e.g., JOB-SWE-2025-001)"], +) -> str: + """Get job posting details.""" + job = JOBS.get(job_id) + if not job: + return f"Job ID '{job_id}' not found in the system." + + if job["status"] != "open": + return f"Warning: Job '{job_id}' is not open. Current status: {job['status']}" + + must_have = "\n".join(f" - {r} (Required)" for r in job["requirements"]["must_have"]) + nice_to_have = "\n".join(f" - {r} (Preferred)" for r in job["requirements"]["nice_to_have"]) + + return f"""Job Details for {job_id}: +Title: {job['title']} +Department: {job['department']} +Location: {job['location']} +Remote Allowed: {'Yes' if job['remote_allowed'] else 'No'} +Salary Range: ${job['salary_range'][0]:,} - ${job['salary_range'][1]:,} +Hiring Manager: {job['hiring_manager']} +Status: {job['status']} +Urgency: {job['urgency']} + +Requirements: +{must_have} +{nice_to_have} + +Description: {job['description']}""" + + +@tool( + name="extract_requirements", + description="Extracts structured requirements from a job posting, separating must-have from nice-to-have skills.", + approval_mode="never_require", +) +def extract_requirements( + job_id: Annotated[str, "The job ID to extract requirements from"], +) -> str: + """Extract structured requirements from job posting.""" + job = JOBS.get(job_id) + if not job: + raise ValueError(f"Job ID '{job_id}' not found.") + + must = ", ".join(job["requirements"]["must_have"]) + nice = ", ".join(job["requirements"]["nice_to_have"]) + + return f"""Structured Requirements for {job_id} ({job['title']}): + +MUST HAVE: {must} +NICE TO HAVE: {nice} +Salary Range: ${job['salary_range'][0]:,} - ${job['salary_range'][1]:,} +Location: {job['location']} {'(Remote OK)' if job['remote_allowed'] else '(On-site required)'}""" + + +# ============================================================================ +# Candidate Sourcing Tools +# ============================================================================ + + +@tool( + name="query_external_candidates", + description="Queries the external candidate database (job board applicants) for candidates who applied to a specific job.", + approval_mode="never_require", +) +def query_external_candidates( + job_id: Annotated[str, "The job ID to find candidates for"], + min_experience: Annotated[int, "Minimum years of experience required"] = 0, +) -> str: + """Query external candidates from job boards.""" + candidates = [ + c + for c in EXTERNAL_CANDIDATES.values() + if job_id in c["applied_jobs"] and c["years_experience"] >= min_experience + ] + if not candidates: + return f"No external candidates found for job {job_id} matching the criteria." + + lines = [f"External Candidates for {job_id} ({len(candidates)} found):"] + for c in candidates: + visa = " [Visa Required]" if c["visa_required"] else "" + lines.append( + f"\n- {[k for k, v in EXTERNAL_CANDIDATES.items() if v is c][0]}: {c['name']}\n" + f" Experience: {c['years_experience']} years | Location: {c['location']}{visa}\n" + f" Skills: {', '.join(c['skills'][:6])}\n" + f" Current: {c['current_title']} at {c['current_company']}\n" + f" Salary Expectation: ${c['salary_expectation']:,}" + ) + return "\n".join(lines) + + +@tool( + name="query_internal_employees", + description="Queries the internal HRIS system for current employees interested in a job opening.", + approval_mode="never_require", +) +def query_internal_employees( + job_id: Annotated[str, "The job ID to find internal candidates for"], + eligible_only: Annotated[bool, "Only return transfer-eligible employees"] = True, +) -> str: + """Query internal employees for mobility.""" + candidates = [c for c in INTERNAL_CANDIDATES.values() if job_id in c["applied_jobs"]] + if eligible_only: + candidates = [c for c in candidates if c["transfer_eligible"]] + + if not candidates: + return f"No internal candidates found for job {job_id}." + + lines = [f"Internal Candidates for {job_id} ({len(candidates)} found):"] + for c in candidates: + lines.append( + f"\n- {[k for k, v in INTERNAL_CANDIDATES.items() if v is c][0]} (Employee: {c['employee_id']}): {c['name']}\n" + f" Current Role: {c['current_title']} ({c['department']})\n" + f" Tenure: {c['tenure_years']} years | Performance: {c['performance_rating']}/5.0\n" + f" Transfer Eligible: {'Yes' if c['transfer_eligible'] else 'No'}\n" + f" Skills: {', '.join(c['skills'][:6])}" + ) + return "\n".join(lines) + + +# ============================================================================ +# Evaluation Tools +# ============================================================================ + + +@tool( + name="score_candidate", + description="Scores a candidate against job requirements. Returns a score from 0-100.", + approval_mode="never_require", +) +def score_candidate( + candidate_id: Annotated[str, "The candidate ID to score"], + job_id: Annotated[str, "The job ID to score against"], +) -> str: + """Score a candidate against job requirements.""" + job = JOBS.get(job_id) + if not job: + raise ValueError(f"Job '{job_id}' not found.") + + # Look up candidate + candidate = EXTERNAL_CANDIDATES.get(candidate_id) or INTERNAL_CANDIDATES.get(candidate_id) + if not candidate: + raise ValueError(f"Candidate '{candidate_id}' not found.") + + # Simple scoring + must_have = job["requirements"]["must_have"] + nice_to_have = job["requirements"]["nice_to_have"] + skills_lower = [s.lower() for s in candidate["skills"]] + + must_matches = sum(1 for r in must_have if any(r.lower() in s for s in skills_lower)) + nice_matches = sum(1 for r in nice_to_have if any(r.lower() in s for s in skills_lower)) + + score = int((must_matches / max(len(must_have), 1)) * 70 + (nice_matches / max(len(nice_to_have), 1)) * 30) + score = min(100, score + min(candidate["years_experience"], 10)) + + must_matched = [r for r in must_have if any(r.lower() in s for s in skills_lower)] + must_gaps = [r for r in must_have if not any(r.lower() in s for s in skills_lower)] + + return f"""Scoring Report for {candidate_id} ({candidate['name']}) against {job_id}: +Overall Score: {score}/100 +Skills Matched (Must Have): {', '.join(must_matched) or 'None'} +Skills Gaps (Must Have): {', '.join(must_gaps) or 'None'} +Nice-to-Have Matches: {nice_matches}/{len(nice_to_have)} +Experience: {candidate['years_experience']} years +Salary Expectation: ${candidate.get('salary_expectation', 0):,} (Budget: ${job['salary_range'][0]:,}-${job['salary_range'][1]:,})""" + + +@tool( + name="rank_candidates", + description="Ranks a list of scored candidates and produces a shortlist.", + approval_mode="never_require", +) +def rank_candidates( + candidate_ids: Annotated[str, "Comma-separated candidate IDs to rank"], + job_id: Annotated[str, "The job ID candidates are being ranked for"], +) -> str: + """Rank candidates and produce shortlist.""" + ids = [c.strip() for c in candidate_ids.split(",")] + job = JOBS.get(job_id) + if not job: + raise ValueError(f"Job '{job_id}' not found.") + + scored = [] + for cid in ids: + cand = EXTERNAL_CANDIDATES.get(cid) or INTERNAL_CANDIDATES.get(cid) + if not cand: + continue + skills_lower = [s.lower() for s in cand["skills"]] + must_have = job["requirements"]["must_have"] + nice_to_have = job["requirements"]["nice_to_have"] + must_matches = sum(1 for r in must_have if any(r.lower() in s for s in skills_lower)) + nice_matches = sum(1 for r in nice_to_have if any(r.lower() in s for s in skills_lower)) + score = int((must_matches / max(len(must_have), 1)) * 70 + (nice_matches / max(len(nice_to_have), 1)) * 30) + score = min(100, score + min(cand["years_experience"], 10)) + scored.append((cid, cand["name"], score)) + + scored.sort(key=lambda x: x[2], reverse=True) + + lines = [f"Candidate Ranking for {job_id} ({job['title']}):"] + for rank, (cid, name, score) in enumerate(scored, 1): + lines.append(f" #{rank}: {cid} ({name}) — Score: {score}/100") + + if scored: + lines.append(f"\nRecommended Shortlist (top 3): {', '.join(s[0] for s in scored[:3])}") + return "\n".join(lines) + + +# ============================================================================ +# Compliance Tools +# ============================================================================ + + +@tool( + name="check_compliance", + description="Checks a shortlist for EEOC compliance and adverse impact using the 4/5ths rule.", + approval_mode="never_require", +) +def check_compliance( + candidate_ids: Annotated[str, "Comma-separated candidate IDs in the shortlist"], + job_id: Annotated[str, "The job ID the shortlist is for"], +) -> str: + """Check shortlist for compliance.""" + ids = [c.strip() for c in candidate_ids.split(",")] + return f"""Compliance Report for {job_id}: +Shortlist Size: {len(ids)} candidates +EEOC Status: PASS +Adverse Impact Analysis: No adverse impact detected (4/5ths rule satisfied) +Demographic Distribution: Diverse candidate pool +Recommendation: Shortlist is compliant — proceed to interviews.""" + + +@tool( + name="detect_bias", + description="Analyzes the selection process for potential bias patterns.", + approval_mode="never_require", +) +def detect_bias( + candidate_ids: Annotated[str, "Comma-separated candidate IDs that were selected"], + job_id: Annotated[str, "The job ID"], +) -> str: + """Detect potential bias in selection.""" + return f"""Bias Detection Report for {job_id}: +Selection Criteria Analysis: All criteria are job-related +Geographic Bias: Not detected +Experience Bias: Not detected (range 4-9 years in pool) +Education Bias: Not detected (mix of BS/MS/PhD) +Overall Risk: LOW +Recommendation: No corrective action needed.""" + + +# ============================================================================ +# Scheduling Tools +# ============================================================================ + + +@tool( + name="get_calendar_availability", + description="Checks calendar availability for the hiring manager.", + approval_mode="never_require", +) +def get_calendar_availability( + hiring_manager_id: Annotated[str, "The hiring manager's employee ID"], +) -> str: + """Get calendar availability.""" + slots = CALENDAR_SLOTS.get(hiring_manager_id) + if not slots: + return f"No calendar data found for manager {hiring_manager_id}." + + available = [s for s in slots if s["available"]] + if not available: + return f"No available slots found for manager {hiring_manager_id}." + + lines = [f"Available Interview Slots for {hiring_manager_id}:"] + for s in available: + lines.append(f" - {s['date']} at {s['time']}") + return "\n".join(lines) + + +@tool( + name="book_interview", + description="Books an interview slot for a candidate with the hiring manager.", + approval_mode="never_require", +) +def book_interview( + candidate_id: Annotated[str, "The candidate ID"], + hiring_manager_id: Annotated[str, "The hiring manager's employee ID"], + date: Annotated[str, "Interview date (YYYY-MM-DD)"], + time: Annotated[str, "Interview time (e.g., '10:00 AM')"], +) -> str: + """Book an interview slot.""" + cand = EXTERNAL_CANDIDATES.get(candidate_id) or INTERNAL_CANDIDATES.get(candidate_id) + name = cand["name"] if cand else candidate_id + + _booked_interviews.append( + { + "candidate_id": candidate_id, + "manager": hiring_manager_id, + "date": date, + "time": time, + } + ) + + return f"""Interview Booked: +Candidate: {name} ({candidate_id}) +Hiring Manager: {hiring_manager_id} +Date: {date} at {time} +Format: Video Conference (Teams link will be sent) +Confirmation: Sent to all participants.""" diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/workflow_planning.prompty b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/workflow_planning.prompty new file mode 100644 index 000000000000..eff1cb432f03 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluators/_workflow_planning/workflow_planning.prompty @@ -0,0 +1,153 @@ +--- +name: Workflow Planning +description: Evaluates whether a multi-agent workflow was well-planned from end-to-end trace evidence +model: + api: chat + parameters: + max_completion_tokens: 3000 + top_p: 1.0 + presence_penalty: 0 + frequency_penalty: 0 + response_format: + type: json_object +inputs: + workflow_trace: + type: string +--- +system: +You are an expert evaluator of multi-agent workflow planning. Judge planning quality from full trace evidence, with emphasis on agent/function suitability, routing, and flow correctness rather than final answer completeness. + +user: +ROLE +==== +You are a judge on Workflow Planning who assesses the quality of a multi-agent workflow execution. Your single focus is to answer the question **Was this workflow well-planned?** and provide the reasoning behind your answer. + +You evaluate planning quality across the below dimensions. Based on the trace evidence, determine which dimensions are applicable. A material failure in any applicable dimension results in a fail verdict. + +WORKFLOW TRACE: +{{workflow_trace}} + +PROCESS REQUIREMENTS +==================== +1. Read and evaluate the trace sequentially from first event to last event. +2. Determine topology first (group-chat sequential dynamic, workflow_dag/concurrent, fixed, or dynamic) from trace evidence. +3. Inspect available executors and routing mechanisms (agent instructions, declared tools/functions, constraints, and observed capabilities), then judge suitability of assignments and handoffs. +4. Apply HARD FAIL CHECKS before assigning the final score. +5. Focus scoring on orchestration quality (decomposition, selection, routing, adaptation). Non-completion alone is not an automatic fail, but PASS is not allowed when structural or routing defects materially block required workflow stages. + +EVALUATION DIMENSIONS +===================== +Evaluate each dimension below and infer applicability from evidence using this rule: +- **Group-chat pattern (sequential dynamic)**: one agent/function is selected per turn; no concurrent branch execution in that turn. Apply A, B1, C, D. +- **Workflow-DAG / Concurrent pattern**: branches may run concurrently and may converge through fan-in edges (via agent or function aggregation). +- **Fixed topology**: if membership/order/edges are predefined and no runtime rerouting occurs, apply B2, C, D and mark A/B1 as N/A. +- **Dynamic topology**: if runtime routing decisions are made from intermediate results/failures, apply A, B1, C, D. +- **Mixed/unclear**: default to Dynamic. + +### A. Task Decomposition +Was the workflow objective decomposed into appropriate sub-tasks aligned with available executor capabilities? +- Did each sub-task map to an executor with relevant expertise/tools? +- Mark N/A if the workflow follows a fixed predefined topology with no runtime task decomposition. + +### B1. Agent Selection & Routing +Were the correct executors selected for the correct tasks at the appropriate step? +- Was each selected agent/function invoked for work matching its instructions/tools/purpose? +- Were selections and handoffs logically ordered? +- Were unnecessary or redundant invocations avoided? +- Mark N/A if agents were invoked in a fixed predefined order with no runtime selection. + +### B2. [For Fixed Workflows] Pre-planned Structure: +- Was each agent placed at the correct stage of the workflow? +- Did any agent receive inputs it was not logically prepared to handle? +- In concurrent/fan-in stages, was branch aggregation coherent (whether done by an agent or a function)? +- For fixed sequential flows, were stage prerequisites satisfied before each handoff? +- Structural defects (wrong order, missing required stage, skipped dependency, wrong fan-in consumer) are material failures even if later text appears plausible. + +### C. Progress Tracking & Adaptation +Was progress monitored and adapted when new information or failures occurred? +- Did intermediate results inform subsequent steps? +- Was forward progress maintained (no infinite loops or repeated identical actions)? +- **Strict rule**: repeated cyclic handoffs/actions with no meaningful new artifact, state change, or retry rationale are a material failure and must produce **FALSE**. +- Were adjustments made when unexpected results or failures occurred? +- For fixed topologies: evaluate whether results from earlier agents were available to later agents and whether the pipeline maintained forward progress. + +### D. Error & Failure Handling +Were tool failures, empty results, and errors acknowledged and handled appropriately? +- Were errors detected and not silently ignored? +- Were reasonable recovery actions taken (retry, alternative approach, graceful degradation)? +- If errors were unrecoverable, was the failure clearly communicated? +- If a critical branch/stage failed, did downstream routing avoid treating that failed output as successful input? + +HARD FAIL CHECKS (ANY = FALSE) +============================== +Before final scoring, check all items below. If any item is true, verdict must be **FALSE**: +1. **Fixed-structure violation**: required stage/node missing or skipped, dependency order violated, or incorrect fan-in consumer/aggregator role. +2. **Stage-capability mismatch**: executor assigned to a stage lacks relevant capabilities/tools for that stage objective, and no corrective reroute occurs. +3. **Context-agnostic dynamic routing**: selector/routing policy is random, round-robin, or otherwise disconnected from task context when specialized executor choice is required. +4. **Stale-state routing**: planner/ledger/selector ignores critical prior outputs or failure signals and keeps delegating as if state had advanced. +5. **Looping/no-op progression**: repeated cycles occur without meaningful new artifacts, state updates, or justified retry strategy. +6. **Ignored critical failure**: a critical failure is unacknowledged or misrepresented while downstream stages proceed as if successful. + +SCORING +======= +Assign a binary score: +- **TRUE**: The workflow was well-planned across all applicable dimensions, and none of the HARD FAIL CHECKS triggered. +- **FALSE**: One or more applicable dimensions had a material failure that degraded planning quality. +- Do not base the verdict on final task completion quality; base it on planning/routing quality evidenced in the trace. + +SCORING EXAMPLES +================ + +### TRUE-1 (A/B1: dynamic decomposition + routing) +Trace summary: orchestrator decomposes work into sourcing, evaluation, and scheduling; each turn selects one executor whose tools match the current sub-task; handoffs use prior results and avoid redundant delegation. +Expected verdict: **TRUE**. + +### TRUE-2 (B2/C: fixed sequential structure) +Trace summary: fixed sequential pipeline runs Source -> Evaluate -> Communicate in dependency order; each stage consumes the prior stage artifact and produces expected intermediate outputs with forward progress. +Expected verdict: **TRUE**. + +### TRUE-3 (B2/C/D: concurrent fan-out + coherent fan-in) +Trace summary: workflow_dag launches parallel branches, then fan-in merges branch outputs coherently (via function or agent) before downstream decision; a transient branch issue is acknowledged and handled without corrupting downstream state. +Expected verdict: **TRUE**. + +### FALSE-1 (B2: fixed order/capability defect) +Trace summary: fixed sequential workflow places communication or scheduling stages before required evidence-gathering or evaluation stages, so prerequisites are unmet and early executors lack capabilities for assigned stage objectives. +Expected verdict: **FALSE**. + +### FALSE-2 (B1: context-agnostic dynamic routing) +Trace summary: selector repeatedly chooses executors by round-robin or random policy rather than sub-task fit, despite clear capability differences and available better matches. +Expected verdict: **FALSE**. + +### FALSE-3 (C/D: stale routing + reliability failure) +Trace summary: coordinator loops or continues stale delegations after critical failure or output mismatch, and downstream stages proceed as if state advanced successfully. +Expected verdict: **FALSE**. + +KEY PRINCIPLES +============== + +1. **Infer Applicability**: Determine from trace evidence which dimensions apply — do not assume +2. **Material Failures**: Minor imperfections are acceptable; only material failures warrant a fail +3. **Trace Evidence**: Base your judgment on evidence in the trace, not assumptions +4. **Planning Focus**: Evaluate how the workflow was planned and coordinated (agent/function choice, routing, and flow), not final output quality nor the full task completion +5. **Loop Severity**: Meaningful loop/stagnation in execution flow is always a material failure +6. **Error Severity**: Ignored or unacknowledged errors that affect workflow reliability are material failures +7. **No Generic Claims**: Claims like "logical routing" or "no loops" must be supported by concrete trace facts + +OUTPUT FORMAT +============= +Output a JSON object with these keys: +{ + "explanation": "<30-80 words summarizing the overall planning quality>", + "details": { + "task_decomposition": "<15-40 words, or 'N/A' if not applicable based on trace evidence>", + "agent_selection": "<15-40 words, or 'N/A' if not applicable based on trace evidence>", + "progress_tracking": "<15-40 words>", + "error_handling": "<15-40 words>" + }, + "success": +} + +Evidence requirement: +- For each non-N/A detail field, include at least one concrete trace fact (executor/function, edge/handoff, tool/action, or error/progress event). + +# Output diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/__init__.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/__init__.py new file mode 100644 index 000000000000..b4686ffb2696 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/__init__.py @@ -0,0 +1,18 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Bug-bash scripts for workflow planning evaluator trace collection and conversion.""" + +from .trace_query import ( + build_full_workflow_query, + process_workflow_trace_rows, + query_traces, +) +from .workflow_trace_converter import convert_workflow_traces + +__all__ = [ + "build_full_workflow_query", + "convert_workflow_traces", + "process_workflow_trace_rows", + "query_traces", +] diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/_utils.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/_utils.py new file mode 100644 index 000000000000..29b045ea1183 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/_utils.py @@ -0,0 +1,315 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Utility to convert structured workflow trace JSON into LLM-readable text.""" + +import logging +from typing import Dict, List, Set, Tuple + +logger = logging.getLogger(__name__) + +_WARN_PARSE_FAILED = "parse_failed: trace parsing failed, evaluation quality may be reduced" +_WARN_TRUNCATED = "truncated: some messages were truncated, evaluation quality may be reduced" +_SEPARATOR = "-" * 40 + +# Role label mapping +_ROLE_LABELS = {"user": "[User]", "assistant": "[Response]", "tool": ""} + + +def format_workflow_trace_for_eval(data: Dict) -> str: + """Convert converter output dict into a human-readable text block for the LLM. + + Logs quality warnings directly rather than returning them. + + :param data: The converter output dict. + :return: The formatted text. + """ + if data.get("parse_failed"): + logger.warning(_WARN_PARSE_FAILED) + return _format_parse_failed(data) + + sections: List[str] = [] + has_truncation = False + + sections.append(_format_workflow_definition(data)) + + user_query = _extract_user_query(data.get("invocations", [])) + if user_query: + sections.append(f"[User Input]\n{user_query}") + + seen_sys_prompts: Dict[str, bool] = {} + previous_output_texts: Set[str] = set() + for inv in data.get("invocations", []): + text, trunc, current_output_texts = _format_invocation( + inv, + seen_sys_prompts, + previous_output_texts=previous_output_texts, + ) + sections.append(text) + previous_output_texts = previous_output_texts | current_output_texts + if trunc: + has_truncation = True + + sections.append(_format_completion(data.get("errors", []))) + + if has_truncation: + logger.warning(_WARN_TRUNCATED) + + return ("\n\n" + _SEPARATOR + "\n\n").join(sections) + + +# --------------------------------------------------------------------------- +# Shared message formatting +# --------------------------------------------------------------------------- + + +def _format_parts(parts: List[Dict], indent: str = " ") -> List[str]: + """Format a message's parts into labeled lines. Handles text, tool_call, and tool_call_response.""" + lines = [] + for p in parts: + ptype = p.get("type", "") + if ptype == "text": + content = p.get("content", "") + if content: + lines.append(f"{indent}{content}") + elif ptype == "tool_call": + name = p.get("name", "?") + args = p.get("arguments", "") + lines.append(f"{indent}[Tool Call] {name}({args})") + elif ptype == "tool_call_response": + resp = str(p.get("response", ""))[:1000] + lines.append(f"{indent}[Tool Result] {resp}") + return lines + + +def _format_messages(messages: List[Dict], indent: str = " ", skip_system: bool = False) -> List[str]: + """Format a list of messages with role labels and part formatting.""" + lines = [] + for msg in messages: + role = msg.get("role", "") + if skip_system and role == "system": + continue + parts = msg.get("parts", []) + formatted = _format_parts(parts, indent) + if not formatted: + continue + # Only show role label for text-bearing messages; skip for pure tool_call/tool_call_response + has_text = any(p.get("type") == "text" for p in parts) + if has_text: + label = _ROLE_LABELS.get(role, f"[{role}]") + if label: + lines.append(f"{indent}{label}") + lines.extend(formatted) + return lines + + +def _normalize_text_for_dedup(content: str) -> str: + return " ".join(str(content).split()).strip().lower() + + +def _normalize_tool_call_for_dedup(part: Dict) -> str: + name = part.get("name", "") + args = part.get("arguments", "") + return _normalize_text_for_dedup(f"tool_call:{name}({args})") + + +def _normalize_tool_call_response_for_dedup(part: Dict) -> str: + resp = str(part.get("response", ""))[:1000] + return _normalize_text_for_dedup(f"tool_result:{resp}") + + +def _extract_normalized_output_parts(messages: List[Dict]) -> Set[str]: + """Extract normalized representations of all output parts (text, tool_call, tool_call_response).""" + normalized: Set[str] = set() + for msg in messages: + for part in msg.get("parts", []): + ptype = part.get("type", "") + if ptype == "text": + value = _normalize_text_for_dedup(part.get("content", "")) + if value: + normalized.add(value) + elif ptype == "tool_call": + value = _normalize_tool_call_for_dedup(part) + if value: + normalized.add(value) + elif ptype == "tool_call_response": + value = _normalize_tool_call_response_for_dedup(part) + if value: + normalized.add(value) + return normalized + + +def _filter_messages_by_previous_output( + messages: List[Dict], previous_output_texts: Set[str] +) -> Tuple[List[Dict], bool]: + if not previous_output_texts: + return messages, False + + filtered_messages: List[Dict] = [] + removed_any = False + for msg in messages: + role = msg.get("role", "") + parts = msg.get("parts", []) + filtered_parts = [] + for part in parts: + ptype = part.get("type", "") + normalized = None + if ptype == "text" and role == "assistant": + normalized = _normalize_text_for_dedup(part.get("content", "")) + elif ptype == "tool_call": + normalized = _normalize_tool_call_for_dedup(part) + elif ptype == "tool_call_response": + normalized = _normalize_tool_call_response_for_dedup(part) + + if normalized and normalized in previous_output_texts: + removed_any = True + continue + filtered_parts.append(part) + + if filtered_parts: + filtered_msg = dict(msg) + filtered_msg["parts"] = filtered_parts + filtered_messages.append(filtered_msg) + return filtered_messages, removed_any + + +# --------------------------------------------------------------------------- +# Section formatters +# --------------------------------------------------------------------------- + + +def _format_parse_failed(data: Dict) -> str: + lines = ["[PARSE FAILED - Raw Traces]"] + for entry in data.get("raw_traces", []): + target = entry.get("target", "") + dims = entry.get("custom_dimensions", {}) + lines.append(f" target: {target}") + for k, v in dims.items(): + lines.append(f" {k}: {str(v)[:500]}") + lines.append("") + return "\n".join(lines) + + +def _format_workflow_definition(data: Dict) -> str: + topo = data.get("topology", {}) + executors: Dict[str, str] = {} + for executor in topo.get("executors", []): + if not isinstance(executor, dict): + continue + executor_id = executor.get("id") + if not executor_id: + continue + executors[str(executor_id)] = str(executor.get("type", "")) + + lines = ["[Workflow Definition]"] + if executors: + lines.append("Executors: " + ", ".join(f"{eid} ({etype})" for eid, etype in executors.items())) + edges = topo.get("edges", []) + if edges: + lines.append("Edges:") + for edge in edges: + if not isinstance(edge, dict): + continue + lines.append(f" {edge.get('source', '?')} -> {edge.get('target', '?')}") + return "\n".join(lines) + + +def _extract_user_query(invocations: List[Dict]) -> str: + if not invocations: + return "" + msgs = invocations[0].get("input_messages", {}) + if msgs.get("_truncated") and not msgs.get("value"): + return msgs.get("_raw", "")[:2000] + for msg in msgs.get("value", []): + if msg.get("role") == "user": + parts = msg.get("parts", []) + texts = [p.get("content", "") for p in parts if p.get("type") == "text"] + return " ".join(texts) + return "" + + +def _format_invocation( + inv: Dict, + seen_sys_prompts: Dict[str, bool], + *, + previous_output_texts: Set[str], +) -> Tuple[str, bool, Set[str]]: + agent = inv.get("agent_name", "Unknown") + seq = inv.get("sequence", "?") + had_truncation = False + current_output_texts: Set[str] = set() + + lines = [f"[{agent} - Invocation {seq}]"] + + # System Prompt (deduplicate per agent) + if agent not in seen_sys_prompts: + sys_instr = inv.get("system_instructions", []) + if sys_instr: + lines.append(" [System Prompt]") + lines.append(f" {sys_instr[0][:2000]}") + seen_sys_prompts[agent] = True + else: + lines.append(" [System Prompt: same as above]") + + # Conversation History (input messages) + in_msgs = inv.get("input_messages", {}) + if in_msgs.get("_truncated") and not in_msgs.get("value"): + lines.append(" [Conversation History] [TRUNCATED]") + lines.append(f" {in_msgs.get('_raw', '')[:4000]}") + had_truncation = True + else: + input_value = in_msgs.get("value", []) + removed_duplicate_assistant_text = False + input_value, removed_duplicate_assistant_text = _filter_messages_by_previous_output( + input_value, previous_output_texts + ) + conv = _format_messages(input_value, skip_system=True) + if conv: + if removed_duplicate_assistant_text: + lines.append(" [Conversation History: prior-agent context included]") + else: + lines.append(" [Conversation History]") + lines.extend(conv) + + # Agent Response (output messages) + lines.append(f" {_SEPARATOR}") + out_msgs = inv.get("output_messages", {}) + if out_msgs.get("_truncated") and not out_msgs.get("value"): + lines.append(f" [{agent} Response] [TRUNCATED]") + lines.append(f" {out_msgs.get('_raw', '')[:4000]}") + had_truncation = True + else: + current_output_texts = _extract_normalized_output_parts(out_msgs.get("value", [])) + tool_lines = [] + output_lines = [] + for msg in out_msgs.get("value", []): + for part in msg.get("parts", []): + ptype = part.get("type", "") + if ptype == "tool_call": + name = part.get("name", "?") + args = part.get("arguments", "") + tool_lines.append(f" [Tool Call] {name}({args})") + elif ptype == "tool_call_response": + resp = str(part.get("response", ""))[:1000] + tool_lines.append(f" [Tool Result] {resp}") + elif ptype == "text": + content = part.get("content", "") + if content: + output_lines.append(f" {content}") + if tool_lines or output_lines: + lines.append(f" [{agent} Response]") + lines.extend(tool_lines) + if output_lines: + lines.append(" [Agent Output]") + lines.extend(output_lines) + + return "\n".join(lines), had_truncation, current_output_texts + + +def _format_completion(errors: List[Dict]) -> str: + if not errors: + return "[Workflow Completion]\nSuccessful" + lines = ["[Workflow Completion]"] + for err in errors: + lines.append(f"Error: {err.get('message', 'Unknown error')}") + return "\n".join(lines) diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/trace_query.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/trace_query.py new file mode 100644 index 000000000000..052ae190fbaa --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/trace_query.py @@ -0,0 +1,149 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Application Insights trace querying utilities for the bug-bash notebook. + +Provides functions to: +- Build KQL queries for fetching workflow traces by trace ID +- Query Application Insights using ``LogsQueryClient`` +- Process raw rows into the format expected by the trace converters +""" + +import json +import logging +from datetime import timedelta +from typing import Any, Dict, List + +from azure.identity import DefaultAzureCredential +from azure.monitor.query import LogsQueryClient, LogsQueryStatus + +logger = logging.getLogger(__name__) + +DEFAULT_LOOKBACK_HOURS = 24 * 7 + + +def build_full_workflow_query(trace_ids: List[str]) -> str: + """Construct a KQL query that fetches ALL telemetry types (traces, dependencies, requests) + for the given trace IDs — needed for full workflow reconstruction. + """ + trace_ids_json = json.dumps(trace_ids) + return f""" +let trace_ids = dynamic({trace_ids_json}); +union traces, dependencies, requests +| where operation_Id in (trace_ids) +| summarize arg_max(timestamp, *) by operation_Id, id +| project + timestamp, + operation_Id, + id, + target, + duration, + customDimensions +| order by timestamp asc""" + + +def _normalize_dimension_value(value: Any) -> Any: + """Convert Application Insights dynamic column values to JSON-compatible Python types.""" + if value is None: + return None + if isinstance(value, (dict, list)): + return value + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return None + try: + return json.loads(stripped) + except (json.JSONDecodeError, TypeError, ValueError): + return value + return value + + +def process_workflow_trace_rows(raw_rows: List[Dict[str, object]]) -> List[Dict[str, object]]: + """Process raw App Insights rows into the span dict format expected by + ``convert_workflow_traces``. + """ + spans: List[Dict[str, object]] = [] + for row in raw_rows: + custom_dims = _normalize_dimension_value(row.get("customDimensions", {})) + if not isinstance(custom_dims, dict): + custom_dims = {} + spans.append( + { + "timestamp": row.get("timestamp"), + "operation_id": row.get("operation_Id", ""), + "span_id": row.get("id", ""), + "target": row.get("target", ""), + "duration": row.get("duration", 0), + "custom_dimensions": custom_dims, + } + ) + return spans + + +def query_traces( + workspace_id: str, + trace_ids: List[str], + lookback_hours: int = DEFAULT_LOOKBACK_HOURS, +) -> List[Dict[str, object]]: + """Query Application Insights for trace data. + + Args: + workspace_id: The Log Analytics workspace ID (GUID) or an ARM resource + path (e.g. ``/subscriptions/.../providers/microsoft.insights/components/...``). + When an ARM path is provided, ``query_resource`` is used instead of + ``query_workspace``. + trace_ids: List of trace IDs to fetch. + lookback_hours: Hours to look back for traces. + + Returns: + List of raw row dicts from the query result. + """ + credential = DefaultAzureCredential() + client = LogsQueryClient(credential=credential) + + query = build_full_workflow_query(trace_ids) + timespan = timedelta(hours=lookback_hours) + + # Detect whether workspace_id is an ARM resource path or a GUID + is_resource_id = workspace_id.strip().startswith("/subscriptions/") or workspace_id.strip().startswith( + "subscriptions/" + ) + + if is_resource_id: + resource_id = workspace_id.strip() + if not resource_id.startswith("/"): + resource_id = "/" + resource_id + logger.info("Querying App Insights resource %s for %s trace IDs...", resource_id, len(trace_ids)) + else: + logger.info("Querying App Insights workspace %s for %s trace IDs...", workspace_id, len(trace_ids)) + + try: + if is_resource_id: + response = client.query_resource( + resource_id=resource_id, + query=query, + timespan=timespan, + ) + else: + response = client.query_workspace( + workspace_id=workspace_id, + query=query, + timespan=timespan, + ) + + if response.status != LogsQueryStatus.SUCCESS: + raise RuntimeError("Application Insights query did not succeed.") + + if not response.tables: + logger.warning("Query returned no tables.") + return [] + + table = response.tables[0] + column_names = [col.name if hasattr(col, "name") else col for col in table.columns] + + raw_rows = [dict(zip(column_names, row)) for row in table.rows] + logger.info("Query returned %s rows.", len(raw_rows)) + return raw_rows + finally: + client.close() diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/workflow_trace_converter.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/workflow_trace_converter.py new file mode 100644 index 000000000000..a7cdd877b9eb --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_workflows/workflow_trace_converter.py @@ -0,0 +1,493 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Workflow trace converter for multi-agent system traces. + +Converts raw trace spans (from JSONL files or App Insights query results) +into a structured document suitable for LLM-based workflow evaluation. + +Supports 6 workflow patterns: sequential, concurrent, magentic, group_chat, +handoff, and workflow_dag. + +Usage: + from workflow_trace_converter import convert_workflow_traces + import json + + spans = [] + with open("trace.jsonl") as f: + for line in f: + spans.append(json.loads(line)) + result = convert_workflow_traces(spans) +""" + +import json +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# App Insights truncation threshold (8KB) +_TRUNCATION_THRESHOLD = 8192 + +# Span target prefixes to keep +_RELEVANT_TARGET_PREFIXES = ( + "workflow.build", + "workflow.run", + "invoke_agent", + "chat", + "executor.process", +) + + +def convert_workflow_traces(spans: List[Dict]) -> Dict: + """Convert raw trace spans into a structured workflow document. + + Args: + spans: List of span dicts, each with keys like ``timestamp``, + ``target``, ``custom_dimensions``, etc. + + Returns: + Structured dict with workflow metadata, topology, agents, + invocations, tool executions, token usage, and + ``parse_failed`` flag. + """ + try: + return _wf_convert(spans) + except Exception: + logger.warning("Workflow trace conversion failed; attempting raw fallback.", exc_info=True) + raw_traces = _wf_extract_raw_fallback(spans) + if not raw_traces: + raise ValueError( + "Workflow trace conversion failed and no usable " + "invoke_agent/chat/execute_tool spans with custom_dimensions found." + ) + return { + "parse_failed": True, + "raw_traces": raw_traces, + } + + +def _wf_convert(spans: List[Dict]) -> Dict: + """Internal conversion logic.""" + relevant = _wf_filter_relevant_spans(spans) + errors = _wf_extract_errors(spans) + + build_spans = [s for s in relevant if s.get("target") == "workflow.build"] + run_spans = [s for s in relevant if s.get("target") == "workflow.run"] + invoke_spans = [s for s in relevant if (s.get("target") or "").startswith("invoke_agent")] + chat_spans = [s for s in relevant if (s.get("target") or "").startswith("chat")] + executor_spans = [s for s in relevant if (s.get("target") or "").startswith("executor.process")] + + invoke_spans.sort(key=lambda s: s.get("timestamp", "")) + chat_spans.sort(key=lambda s: s.get("timestamp", "")) + executor_spans.sort(key=lambda s: s.get("timestamp", "")) + + run_span = max(run_spans, key=lambda s: s.get("timestamp", "")) if run_spans else None + + metadata = _wf_extract_workflow_metadata(build_spans, run_span) + workflow_def = metadata.get("workflow_definition", {}) + + _ = _wf_detect_workflow_type(workflow_def, executor_spans) + topology = _wf_build_topology(workflow_def) + agents = _wf_extract_agents(invoke_spans, chat_spans, executor_spans) + invocations = _wf_extract_invocations(invoke_spans) + + return { + "parse_failed": False, + "workflow_id": metadata.get("workflow_id"), + "workflow_name": metadata.get("workflow_name"), + "topology": topology, + "agents": agents, + "invocations": invocations, + "errors": errors, + } + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +_RAW_FALLBACK_PREFIXES = ("invoke_agent", "chat", "execute_tool") + + +def _wf_extract_raw_fallback(spans: Any) -> List[Dict]: + if not isinstance(spans, list): + return [] + result: List[Dict] = [] + for span in spans: + if not isinstance(span, dict): + continue + target = span.get("target") or "" + if not any(target.startswith(p) for p in _RAW_FALLBACK_PREFIXES): + continue + cd = span.get("custom_dimensions") + if isinstance(cd, dict) and cd: + result.append({"target": target, "custom_dimensions": cd}) + return result + + +def _wf_extract_errors(spans: List[Dict]) -> List[Dict]: + errors = [] + for span in spans: + if not isinstance(span, dict): + continue + cd = span.get("custom_dimensions") or {} + msg = cd.get("error.message") + if msg: + errors.append( + { + "message": str(msg), + "span_id": span.get("span_id", ""), + "timestamp": span.get("timestamp", ""), + } + ) + return errors + + +def _wf_filter_relevant_spans(spans: List[Dict]) -> List[Dict]: + result = [] + for span in spans: + target = span.get("target") or "" + if not target: + continue + if any(target.startswith(p) for p in _RELEVANT_TARGET_PREFIXES): + result.append(span) + return result + + +def _wf_safe_parse_json(value: Any) -> Any: + if value is None: + return None + if isinstance(value, (dict, list)): + return value + if not isinstance(value, str): + return value + text = value.strip() + if not text: + return text + try: + return json.loads(text) + except (json.JSONDecodeError, ValueError): + if len(value) >= _TRUNCATION_THRESHOLD: + return {"_raw": value, "_truncated": True} + return value + + +def _wf_wrap(value: Any, default: Any = None) -> Dict: + if isinstance(value, dict) and value.get("_truncated") is True: + return value + return {"value": value if value is not None else default, "_truncated": False} + + +def _wf_extract_workflow_metadata( + build_spans: List[Dict], + run_span: Optional[Dict], +) -> Dict: + result: Dict[str, Any] = { + "workflow_id": None, + "workflow_name": None, + "workflow_definition": {}, + } + + run_workflow_id = None + if run_span: + cd = run_span.get("custom_dimensions", {}) + run_workflow_id = cd.get("workflow.id") + + candidates: List[Dict[str, Any]] = [] + for build_span in build_spans: + cd = build_span.get("custom_dimensions", {}) + workflow_id = cd.get("workflow.id") + workflow_name = cd.get("workflow_builder.name") or cd.get("workflow.name") + parsed_definition = _wf_safe_parse_json(cd.get("workflow.definition", "")) + if not isinstance(parsed_definition, dict): + parsed_definition = {} + candidates.append( + { + "timestamp": build_span.get("timestamp", ""), + "workflow_id": workflow_id, + "workflow_name": workflow_name, + "workflow_definition": parsed_definition, + } + ) + + selected: Optional[Dict[str, Any]] = None + if run_workflow_id: + selected = next((c for c in candidates if c.get("workflow_id") == run_workflow_id), None) + if selected is None and candidates: + sorted_candidates = sorted(candidates, key=lambda c: c.get("timestamp", ""), reverse=True) + selected = next((c for c in sorted_candidates if c.get("workflow_definition")), sorted_candidates[0]) + + if selected: + result["workflow_id"] = selected.get("workflow_id") + result["workflow_name"] = selected.get("workflow_name") + result["workflow_definition"] = selected.get("workflow_definition") or {} + + if run_span: + cd = run_span.get("custom_dimensions", {}) + if not result["workflow_id"]: + result["workflow_id"] = cd.get("workflow.id") + if not result["workflow_name"]: + result["workflow_name"] = cd.get("workflow.name") + return result + + +def _wf_detect_workflow_type(workflow_definition: Dict, executor_spans: List[Dict]) -> str: + executor_types: set = set() + for exec_info in workflow_definition.get("executors", {}).values(): + t = exec_info.get("type", "") + if t: + executor_types.add(t) + for span in executor_spans: + t = span.get("custom_dimensions", {}).get("executor.type", "") + if t: + executor_types.add(t) + lower_executor_types = {str(t).lower() for t in executor_types if t} + + if any("magentic" in t and "orchestrator" in t for t in lower_executor_types): + return "magentic" + if any("groupchat" in t or "group_chat" in t for t in lower_executor_types): + return "group_chat" + if any("handoff" in t for t in lower_executor_types): + return "handoff" + if any("dispatchtoallparticipants" in t or "dispatch_to_all_participants" in t for t in lower_executor_types): + return "concurrent" + if any("fanout" in t or "fanin" in t for t in lower_executor_types): + return "workflow_dag" + for eg in workflow_definition.get("edge_groups", []): + if eg.get("type") == "SwitchCaseEdgeGroup": + return "workflow_dag" + if any( + "inputtoconversation" in t or "responsetoconversation" in t or "endwithconversation" in t + for t in lower_executor_types + ): + return "sequential" + edges = [] + for eg in workflow_definition.get("edge_groups", []): + if eg.get("type") == "InternalEdgeGroup": + continue + for e in eg.get("edges", []): + edges.append((e.get("source_id"), e.get("target_id"))) + edge_set = set(edges) + has_cycle = any((t, s) in edge_set for s, t in edges) + if has_cycle: + return "workflow_dag" + if {t for t in lower_executor_types if t} <= {"agentexecutor"}: + return "workflow_dag" + logger.warning( + "Could not detect workflow type. Executor types found: %s", + executor_types, + ) + return "workflow_dag" + + +def _wf_build_topology(workflow_definition: Dict) -> Dict: + executors_dict = workflow_definition.get("executors", {}) + edge_groups = workflow_definition.get("edge_groups", []) + + executors = [] + for eid, einfo in executors_dict.items(): + etype = einfo.get("type", "") + executors.append({"id": eid, "type": etype}) + + edges = [] + for eg in edge_groups: + eg_type = eg.get("type", "") + if eg_type == "InternalEdgeGroup": + continue + eg_edges = eg.get("edges", []) + if not eg_edges: + continue + if eg_type in ("FanOutEdgeGroup", "FanInEdgeGroup"): + for e in eg_edges: + edges.append( + { + "type": eg_type, + "source": e.get("source_id", ""), + "target": e.get("target_id", ""), + } + ) + elif eg_type == "SwitchCaseEdgeGroup": + for e in eg_edges: + edge_entry: Dict[str, Any] = { + "type": eg_type, + "source": e.get("source_id", ""), + "target": e.get("target_id", ""), + } + cases = eg.get("cases") + if cases: + edge_entry["cases"] = cases + edges.append(edge_entry) + else: + for e in eg_edges: + edges.append( + { + "type": eg_type, + "source": e.get("source_id", ""), + "target": e.get("target_id", ""), + } + ) + + return { + "start_executor_id": workflow_definition.get("start_executor_id", ""), + "executors": executors, + "edges": edges, + } + + +def _wf_extract_agents( + invoke_agent_spans: List[Dict], + chat_spans: List[Dict], + executor_spans: List[Dict], +) -> Dict: + agents: Dict[str, Dict] = {} + for span in invoke_agent_spans: + cd = span.get("custom_dimensions", {}) + agent_name = cd.get("gen_ai.agent.name", "") + if not agent_name or agent_name in agents: + continue + sys_instr = _wf_safe_parse_json(cd.get("gen_ai.system_instructions")) + tool_defs = _wf_safe_parse_json(cd.get("gen_ai.tool.definitions")) + normalized_tools = _wf_normalize_tool_definitions(tool_defs) + agents[agent_name] = { + "agent_id": cd.get("gen_ai.agent.id", ""), + "model": cd.get("gen_ai.request.model", ""), + "system_instructions": _wf_extract_text_from_parts(sys_instr), + "tool_definitions": normalized_tools, + } + _wf_merge_chat_span_data(agents, chat_spans, executor_spans) + return agents + + +def _wf_merge_chat_span_data( + agents: Dict[str, Dict], + chat_spans: List[Dict], + executor_spans: List[Dict], +) -> None: + if not chat_spans or not agents: + return + tagged: List[tuple] = [] + for s in executor_spans: + tagged.append(("exec", s)) + for s in chat_spans: + tagged.append(("chat", s)) + tagged.sort(key=lambda x: x[1].get("timestamp", "")) + + current_agent_name: Optional[str] = None + for kind, span in tagged: + if kind == "exec": + cd = span.get("custom_dimensions", {}) + executor_id = cd.get("executor.id", "") + matched_agent = None + for aname in agents: + if executor_id == aname or executor_id.endswith("_" + aname) or executor_id.endswith(":" + aname): + matched_agent = aname + break + current_agent_name = matched_agent + elif kind == "chat" and current_agent_name: + cd = span.get("custom_dimensions", {}) + agent = agents[current_agent_name] + raw_tools = cd.get("gen_ai.tool.definitions") + if raw_tools: + parsed = _wf_safe_parse_json(raw_tools) + chat_tools = _wf_normalize_tool_definitions(parsed) + if chat_tools: + existing = {t.get("name") for t in agent["tool_definitions"]} + for tool in chat_tools: + name = tool.get("name") + if name and name not in existing: + agent["tool_definitions"].append(tool) + existing.add(name) + if not agent["system_instructions"]: + raw_si = cd.get("gen_ai.system_instructions") + if raw_si: + parsed_si = _wf_safe_parse_json(raw_si) + text = _wf_extract_text_from_parts(parsed_si) + if text: + agent["system_instructions"] = text + + +def _wf_extract_text_from_parts(value: Any) -> str: + if isinstance(value, str): + return value + if isinstance(value, dict) and value.get("_truncated"): + return str(value.get("_raw", "")) + if isinstance(value, list): + texts = [] + for item in value: + if isinstance(item, dict): + text = item.get("content") or item.get("text") or "" + if text: + texts.append(text) + elif isinstance(item, str): + texts.append(item) + return "\n".join(texts) if texts else "" + return str(value) if value is not None else "" + + +def _wf_normalize_tool_definitions(tool_defs: Any) -> List[Dict]: + if isinstance(tool_defs, dict) and tool_defs.get("_truncated"): + return [tool_defs] + if not isinstance(tool_defs, list): + return [] + result = [] + for td in tool_defs: + if not isinstance(td, dict): + continue + if "function" in td: + func = td.get("function", {}) + result.append( + { + "name": func.get("name", ""), + "description": func.get("description", ""), + "parameters": func.get("parameters", {}), + } + ) + else: + result.append( + { + "name": td.get("name", ""), + "description": td.get("description", ""), + "parameters": td.get("parameters", {}), + } + ) + return result + + +def _wf_extract_invocations(invoke_agent_spans: List[Dict]) -> List[Dict]: + invocations = [] + for seq, span in enumerate(invoke_agent_spans, start=1): + cd = span.get("custom_dimensions", {}) + input_msgs = _wf_safe_parse_json(cd.get("gen_ai.input.messages")) + output_msgs = _wf_safe_parse_json(cd.get("gen_ai.output.messages")) + sys_instrs = _wf_extract_system_instructions_from_input(input_msgs) + invocations.append( + { + "sequence": seq, + "timestamp": span.get("timestamp", ""), + "agent_name": cd.get("gen_ai.agent.name", ""), + "agent_id": cd.get("gen_ai.agent.id", ""), + "system_instructions": sys_instrs, + "input_messages": _wf_wrap(input_msgs, []), + "output_messages": _wf_wrap(output_msgs, []), + } + ) + return invocations + + +def _wf_extract_system_instructions_from_input(input_msgs: Any) -> List[str]: + if not isinstance(input_msgs, list): + return [] + result: List[str] = [] + for msg in input_msgs: + if not isinstance(msg, dict) or msg.get("role") not in ("system", "developer"): + continue + parts = msg.get("parts", []) + if not isinstance(parts, list): + continue + for part in parts: + if isinstance(part, dict) and part.get("type") == "text": + content = part.get("content", "") + if content: + result.append(content) + return result diff --git a/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_workflow_planning_evaluator.py b/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_workflow_planning_evaluator.py new file mode 100644 index 000000000000..ad711ca6cf32 --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_workflow_planning_evaluator.py @@ -0,0 +1,621 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from unittest.mock import MagicMock, patch + +import pytest +from azure.ai.evaluation._evaluators._workflow_planning import _WorkflowPlanningEvaluator +from azure.ai.evaluation._evaluators._workflow_planning._utils import format_workflow_trace_for_eval +from azure.ai.evaluation._exceptions import ErrorCategory, EvaluationException + + +def _build_trace_with_repeated_history(repeated_assistant_text: str, second_invocation_assistant_text: str = None): + if second_invocation_assistant_text is None: + second_invocation_assistant_text = repeated_assistant_text + + return { + "topology": { + "executors": [{"id": "planner", "type": "assistant"}], + "edges": [{"source": "planner", "target": "planner"}], + "max_iterations": 2, + }, + "invocations": [ + { + "agent_name": "Planner", + "sequence": 1, + "system_instructions": ["You are a planning agent."], + "input_messages": { + "value": [ + {"role": "user", "parts": [{"type": "text", "content": "Plan my workflow."}]}, + ] + }, + "output_messages": { + "value": [ + { + "role": "assistant", + "parts": [{"type": "text", "content": repeated_assistant_text}], + } + ] + }, + }, + { + "agent_name": "Planner", + "sequence": 2, + "system_instructions": ["You are a planning agent."], + "input_messages": { + "value": [ + { + "role": "assistant", + "parts": [ + {"type": "text", "content": second_invocation_assistant_text}, + {"type": "tool_call", "name": "search_docs", "arguments": '{"topic":"dedup"}'}, + {"type": "tool_call_response", "response": "doc snippet from tool"}, + ], + }, + {"role": "user", "parts": [{"type": "text", "content": "Please continue with the plan."}]}, + ] + }, + "output_messages": { + "value": [ + { + "role": "assistant", + "parts": [{"type": "text", "content": "Here is the next step."}], + } + ] + }, + }, + ], + "errors": [], + } + + +@pytest.mark.unittest +class TestWorkflowPlanningFormatter: + def test_formatter_handles_partial_topology_without_crashing(self): + trace = { + "topology": { + "executors": [ + {"type": "assistant"}, + {}, + "not-a-dict", + {"id": "planner", "type": "assistant"}, + ], + "edges": [ + "bad-edge", + {"source": "planner", "target": "planner"}, + ], + }, + "invocations": [], + "errors": [], + } + + formatted = format_workflow_trace_for_eval(trace) + + assert "[Workflow Definition]" in formatted + assert "planner (assistant)" in formatted + assert "planner -> planner" in formatted + + def test_dedup_enabled_removes_repeated_assistant_text_only(self): + repeated_text = "I already answered this exact sentence." + workflow_trace = _build_trace_with_repeated_history(repeated_text) + + formatted = format_workflow_trace_for_eval(workflow_trace) + invocation_two = formatted.split("[Planner - Invocation 2]", maxsplit=1)[1] + + assert repeated_text in formatted + assert repeated_text not in invocation_two + assert "[Conversation History: prior-agent context included]" in invocation_two + assert "Please continue with the plan." in invocation_two + assert '[Tool Call] search_docs({"topic":"dedup"})' in invocation_two + assert "[Tool Result] doc snippet from tool" in invocation_two + + def test_dedup_enabled_without_duplicate_removal_keeps_default_history_label(self): + repeated_text = "I already answered this exact sentence." + workflow_trace = _build_trace_with_repeated_history( + repeated_assistant_text=repeated_text, + second_invocation_assistant_text="This is different assistant carryover content.", + ) + + formatted = format_workflow_trace_for_eval(workflow_trace) + invocation_two = formatted.split("[Planner - Invocation 2]", maxsplit=1)[1] + + assert "[Conversation History: prior-agent context included]" not in invocation_two + assert "[Conversation History]" in invocation_two + + def test_dedup_removes_tool_calls_and_results_from_prior_agent(self): + """Tool calls and tool results produced by Agent A should be stripped from Agent B's input.""" + trace = { + "topology": { + "executors": [{"id": "agentA", "type": "assistant"}, {"id": "agentB", "type": "assistant"}], + "edges": [{"source": "agentA", "target": "agentB"}], + }, + "invocations": [ + { + "agent_name": "AgentA", + "sequence": 1, + "system_instructions": ["You are Agent A."], + "input_messages": { + "value": [ + {"role": "user", "parts": [{"type": "text", "content": "Hello"}]}, + ] + }, + "output_messages": { + "value": [ + { + "role": "assistant", + "parts": [ + {"type": "tool_call", "name": "lookup", "arguments": '{"q":"info"}'}, + ], + }, + { + "role": "tool", + "parts": [ + {"type": "tool_call_response", "response": "lookup result data"}, + ], + }, + { + "role": "assistant", + "parts": [{"type": "text", "content": "AgentA final answer"}], + }, + ] + }, + }, + { + "agent_name": "AgentB", + "sequence": 2, + "system_instructions": ["You are Agent B."], + "input_messages": { + "value": [ + {"role": "user", "parts": [{"type": "text", "content": "Hello"}]}, + # Carried over from AgentA's output: + { + "role": "assistant", + "parts": [ + {"type": "tool_call", "name": "lookup", "arguments": '{"q":"info"}'}, + ], + }, + { + "role": "tool", + "parts": [ + {"type": "tool_call_response", "response": "lookup result data"}, + ], + }, + { + "role": "assistant", + "parts": [{"type": "text", "content": "AgentA final answer"}], + }, + {"role": "user", "parts": [{"type": "text", "content": "Now do step 2"}]}, + ] + }, + "output_messages": { + "value": [ + { + "role": "assistant", + "parts": [{"type": "text", "content": "AgentB result"}], + } + ] + }, + }, + ], + "errors": [], + } + + formatted = format_workflow_trace_for_eval(trace) + invocation_two = formatted.split("[AgentB - Invocation 2]", maxsplit=1)[1] + + # AgentA's tool call, tool result, and text response should all be stripped + assert '[Tool Call] lookup({"q":"info"})' not in invocation_two + assert "[Tool Result] lookup result data" not in invocation_two + assert "AgentA final answer" not in invocation_two + # AgentB's own content and user messages should remain + assert "Now do step 2" in invocation_two + assert "AgentB result" in invocation_two + assert "[Conversation History: prior-agent context included]" in invocation_two + + def test_dedup_accumulates_across_multiple_invocations(self): + """Outputs from invocation 1 should still be deduped in invocation 3's input.""" + trace = { + "topology": { + "executors": [ + {"id": "a", "type": "assistant"}, + {"id": "b", "type": "assistant"}, + {"id": "c", "type": "assistant"}, + ], + "edges": [{"source": "a", "target": "b"}, {"source": "b", "target": "c"}], + }, + "invocations": [ + { + "agent_name": "A", + "sequence": 1, + "system_instructions": ["Agent A"], + "input_messages": {"value": [{"role": "user", "parts": [{"type": "text", "content": "start"}]}]}, + "output_messages": { + "value": [ + {"role": "assistant", "parts": [{"type": "text", "content": "A output"}]}, + ] + }, + }, + { + "agent_name": "B", + "sequence": 2, + "system_instructions": ["Agent B"], + "input_messages": { + "value": [ + {"role": "assistant", "parts": [{"type": "text", "content": "A output"}]}, + ] + }, + "output_messages": { + "value": [ + {"role": "assistant", "parts": [{"type": "text", "content": "B output"}]}, + ] + }, + }, + { + "agent_name": "C", + "sequence": 3, + "system_instructions": ["Agent C"], + "input_messages": { + "value": [ + # Both prior agent outputs carried into C's input + {"role": "assistant", "parts": [{"type": "text", "content": "A output"}]}, + {"role": "assistant", "parts": [{"type": "text", "content": "B output"}]}, + {"role": "user", "parts": [{"type": "text", "content": "finish"}]}, + ] + }, + "output_messages": { + "value": [ + {"role": "assistant", "parts": [{"type": "text", "content": "C output"}]}, + ] + }, + }, + ], + "errors": [], + } + + formatted = format_workflow_trace_for_eval(trace) + invocation_three = formatted.split("[C - Invocation 3]", maxsplit=1)[1] + + # Both A and B outputs should be stripped from C's input + assert "A output" not in invocation_three + assert "B output" not in invocation_three + # C's own content and user messages should remain + assert "finish" in invocation_three + assert "C output" in invocation_three + + +@pytest.mark.unittest +class TestWorkflowPlanningEvaluator: + def test_evaluator_calls_formatter(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator( + model_config=mock_model_config, + ) + + async def flow_side_effect(timeout, **kwargs): + assert kwargs["workflow_trace"] == "formatted trace" + return {"llm_output": {"success": True, "explanation": "ok", "details": {}}} + + evaluator._flow = MagicMock(side_effect=flow_side_effect) + workflow_trace = {"topology": {}, "invocations": [], "errors": []} + + with patch( + "azure.ai.evaluation._evaluators._workflow_planning._workflow_planning.format_workflow_trace_for_eval", + return_value="formatted trace", + ) as mock_formatter: + result = evaluator(workflow_trace=workflow_trace) + + assert result[_WorkflowPlanningEvaluator._RESULT_KEY] == 1 + mock_formatter.assert_called_once_with(workflow_trace) + + def test_non_empty_workflow_errors_raises_not_applicable(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + evaluator._flow = MagicMock() + + workflow_trace = { + "topology": {}, + "invocations": [], + "errors": [{"message": "terminal failure"}], + } + + with pytest.raises(EvaluationException) as exc_info: + evaluator(workflow_trace=workflow_trace) + + assert exc_info.value.category == ErrorCategory.NOT_APPLICABLE + evaluator._flow.assert_not_called() + + def test_workflow_trace_none_raises_missing_field(self, mock_model_config): + """When workflow_trace is None, _do_eval raises MISSING_FIELD with a clear message.""" + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + evaluator._flow = MagicMock() + + with pytest.raises(EvaluationException) as exc_info: + evaluator(workflow_trace=None) + + assert exc_info.value.category == ErrorCategory.MISSING_FIELD + assert "workflow_trace must be provided" in str(exc_info.value) + evaluator._flow.assert_not_called() + + def test_workflow_trace_invalid_json_string_raises_invalid_value(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + evaluator._flow = MagicMock() + + with pytest.raises(EvaluationException) as exc_info: + evaluator(workflow_trace="{not valid json") + + assert exc_info.value.category == ErrorCategory.INVALID_VALUE + evaluator._flow.assert_not_called() + + def test_workflow_trace_non_dict_type_raises_invalid_value(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + evaluator._flow = MagicMock() + + with pytest.raises(EvaluationException) as exc_info: + evaluator(workflow_trace=[1, 2, 3]) + + assert exc_info.value.category == ErrorCategory.INVALID_VALUE + evaluator._flow.assert_not_called() + + def test_workflow_trace_valid_json_string_is_parsed(self, mock_model_config): + """A valid JSON string should be parsed into a dict and processed.""" + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + + async def flow_side_effect(timeout, **kwargs): + return {"llm_output": {"success": True, "explanation": "ok", "details": {}}} + + evaluator._flow = MagicMock(side_effect=flow_side_effect) + import json + + trace_dict = {"topology": {}, "invocations": [], "errors": []} + + result = evaluator(workflow_trace=json.dumps(trace_dict)) + assert result[_WorkflowPlanningEvaluator._RESULT_KEY] == 1 + + def test_llm_returns_non_dict_output_raises_failed_execution(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + + async def flow_side_effect(timeout, **kwargs): + return {"llm_output": "unexpected string"} + + evaluator._flow = MagicMock(side_effect=flow_side_effect) + workflow_trace = {"topology": {}, "invocations": [], "errors": []} + + with patch( + "azure.ai.evaluation._evaluators._workflow_planning._workflow_planning.format_workflow_trace_for_eval", + return_value="formatted", + ): + with pytest.raises(EvaluationException) as exc_info: + evaluator(workflow_trace=workflow_trace) + + assert exc_info.value.category == ErrorCategory.FAILED_EXECUTION + + def test_llm_returns_success_as_string_true(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + + async def flow_side_effect(timeout, **kwargs): + return {"llm_output": {"success": "true", "explanation": "ok", "details": {}}} + + evaluator._flow = MagicMock(side_effect=flow_side_effect) + workflow_trace = {"topology": {}, "invocations": [], "errors": []} + + with patch( + "azure.ai.evaluation._evaluators._workflow_planning._workflow_planning.format_workflow_trace_for_eval", + return_value="formatted", + ): + result = evaluator(workflow_trace=workflow_trace) + + assert result[_WorkflowPlanningEvaluator._RESULT_KEY] == 1 + assert result["workflow_planning_result"] == "pass" + + def test_llm_returns_success_as_string_false(self, mock_model_config): + evaluator = _WorkflowPlanningEvaluator(model_config=mock_model_config) + + async def flow_side_effect(timeout, **kwargs): + return {"llm_output": {"success": "false", "explanation": "bad", "details": {}}} + + evaluator._flow = MagicMock(side_effect=flow_side_effect) + workflow_trace = {"topology": {}, "invocations": [], "errors": []} + + with patch( + "azure.ai.evaluation._evaluators._workflow_planning._workflow_planning.format_workflow_trace_for_eval", + return_value="formatted", + ): + result = evaluator(workflow_trace=workflow_trace) + + assert result[_WorkflowPlanningEvaluator._RESULT_KEY] == 0 + assert result["workflow_planning_result"] == "fail" + + +@pytest.mark.unittest +class TestWorkflowPlanningFormatterEdgeCases: + def test_parse_failed_trace_formats_raw_fallback(self): + trace = { + "parse_failed": True, + "raw_traces": [ + { + "target": "invoke_agent chat", + "custom_dimensions": {"gen_ai.agent.name": "AgentX", "gen_ai.input.messages": "hello"}, + } + ], + } + formatted = format_workflow_trace_for_eval(trace) + assert "[PARSE FAILED - Raw Traces]" in formatted + assert "invoke_agent chat" in formatted + assert "AgentX" in formatted + + def test_truncated_input_messages(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [ + { + "agent_name": "Agent", + "sequence": 1, + "system_instructions": ["sys"], + "input_messages": {"_raw": "x" * 9000, "_truncated": True}, + "output_messages": { + "value": [{"role": "assistant", "parts": [{"type": "text", "content": "reply"}]}] + }, + } + ], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert "[TRUNCATED]" in formatted + assert "x" in formatted + + def test_truncated_output_messages(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [ + { + "agent_name": "Agent", + "sequence": 1, + "system_instructions": ["sys"], + "input_messages": {"value": [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}]}, + "output_messages": {"_raw": "y" * 9000, "_truncated": True}, + } + ], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert "[Agent Response] [TRUNCATED]" in formatted + + def test_tool_call_with_no_tool_result(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [ + { + "agent_name": "Agent", + "sequence": 1, + "system_instructions": ["sys"], + "input_messages": {"value": [{"role": "user", "parts": [{"type": "text", "content": "search"}]}]}, + "output_messages": { + "value": [ + { + "role": "assistant", + "parts": [ + {"type": "tool_call", "name": "search", "arguments": '{"q":"test"}'}, + # No tool_call_response part + ], + } + ] + }, + } + ], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert '[Tool Call] search({"q":"test"})' in formatted + assert "[Tool Result]" not in formatted + + def test_multiple_user_inputs_in_single_invocation(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [ + { + "agent_name": "Agent", + "sequence": 1, + "system_instructions": ["sys"], + "input_messages": { + "value": [ + {"role": "user", "parts": [{"type": "text", "content": "first question"}]}, + {"role": "assistant", "parts": [{"type": "text", "content": "intermediate"}]}, + {"role": "user", "parts": [{"type": "text", "content": "follow up"}]}, + ] + }, + "output_messages": { + "value": [{"role": "assistant", "parts": [{"type": "text", "content": "final answer"}]}] + }, + } + ], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert "first question" in formatted + assert "follow up" in formatted + assert "final answer" in formatted + + def test_empty_invocations_list(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert "[Workflow Definition]" in formatted + assert "[Workflow Completion]" in formatted + assert "Successful" in formatted + + def test_workflow_definition_formatting(self): + trace = { + "topology": { + "executors": [ + {"id": "planner", "type": "orchestrator"}, + {"id": "researcher", "type": "assistant"}, + ], + "edges": [{"source": "planner", "target": "researcher"}], + }, + "invocations": [], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert "planner (orchestrator)" in formatted + assert "researcher (assistant)" in formatted + assert "planner -> researcher" in formatted + + def test_errors_in_completion_section(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [], + "errors": [{"message": "Agent timeout"}, {"message": "Connection lost"}], + } + formatted = format_workflow_trace_for_eval(trace) + assert "Error: Agent timeout" in formatted + assert "Error: Connection lost" in formatted + + def test_system_prompt_dedup_across_same_agent(self): + """System prompt should appear once for the first invocation of an agent, + then '[System Prompt: same as above]' for subsequent invocations.""" + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [ + { + "agent_name": "Agent", + "sequence": 1, + "system_instructions": ["You are helpful."], + "input_messages": {"value": [{"role": "user", "parts": [{"type": "text", "content": "q1"}]}]}, + "output_messages": {"value": [{"role": "assistant", "parts": [{"type": "text", "content": "a1"}]}]}, + }, + { + "agent_name": "Agent", + "sequence": 2, + "system_instructions": ["You are helpful."], + "input_messages": {"value": [{"role": "user", "parts": [{"type": "text", "content": "q2"}]}]}, + "output_messages": {"value": [{"role": "assistant", "parts": [{"type": "text", "content": "a2"}]}]}, + }, + ], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert formatted.count("You are helpful.") == 1 + assert "[System Prompt: same as above]" in formatted + + def test_user_query_extracted_from_first_invocation(self): + trace = { + "topology": {"executors": [], "edges": []}, + "invocations": [ + { + "agent_name": "Agent", + "sequence": 1, + "system_instructions": [], + "input_messages": { + "value": [{"role": "user", "parts": [{"type": "text", "content": "What is the weather?"}]}] + }, + "output_messages": { + "value": [{"role": "assistant", "parts": [{"type": "text", "content": "sunny"}]}] + }, + } + ], + "errors": [], + } + formatted = format_workflow_trace_for_eval(trace) + assert "[User Input]" in formatted + assert "What is the weather?" in formatted diff --git a/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_workflow_trace_converter.py b/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_workflow_trace_converter.py new file mode 100644 index 000000000000..e0ee6d7e946a --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/tests/unittests/test_workflow_trace_converter.py @@ -0,0 +1,205 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import json + +import pytest +from azure.ai.evaluation._evaluators._workflow_planning.scripts.workflow_trace_converter import convert_workflow_traces + + +def _build_span(timestamp: str, target: str, custom_dimensions: dict) -> dict: + return { + "timestamp": timestamp, + "target": target, + "custom_dimensions": custom_dimensions, + } + + +def _workflow_definition( + workflow_id: str, workflow_name: str, start_executor_id: str, executors: dict, edges: list +) -> str: + return json.dumps( + { + "id": workflow_id, + "name": workflow_name, + "start_executor_id": start_executor_id, + "max_iterations": 100, + "executors": executors, + "edge_groups": [ + { + "id": "SingleEdgeGroup/1", + "type": "SingleEdgeGroup", + "edges": edges, + } + ], + "output_executors": [list(executors.keys())[-1]], + } + ) + + +@pytest.mark.unittest +class TestWorkflowTraceConverter: + def test_selects_build_span_matching_run_workflow_id(self): + nested_id = "nested-workflow" + root_id = "root-workflow" + + nested_def = _workflow_definition( + workflow_id=nested_id, + workflow_name="NestedWorkflow", + start_executor_id="Evaluator", + executors={ + "Evaluator": {"id": "Evaluator", "type": "AgentExecutor"}, + "agent_request_info_executor": { + "id": "agent_request_info_executor", + "type": "AgentRequestInfoExecutor", + }, + }, + edges=[ + {"source_id": "Evaluator", "target_id": "agent_request_info_executor"}, + {"source_id": "agent_request_info_executor", "target_id": "Evaluator"}, + ], + ) + + root_def = _workflow_definition( + workflow_id=root_id, + workflow_name="RootWorkflow", + start_executor_id="input-conversation", + executors={ + "input-conversation": {"id": "input-conversation", "type": "_InputToConversation"}, + "ReqMaster": {"id": "ReqMaster", "type": "AgentExecutor"}, + "TalentScout": {"id": "TalentScout", "type": "AgentExecutor"}, + "Evaluator": {"id": "Evaluator", "type": "AgentApprovalExecutor"}, + "Scheduler": {"id": "Scheduler", "type": "AgentExecutor"}, + "end": {"id": "end", "type": "_EndWithConversation"}, + }, + edges=[ + {"source_id": "input-conversation", "target_id": "ReqMaster"}, + {"source_id": "ReqMaster", "target_id": "TalentScout"}, + {"source_id": "TalentScout", "target_id": "Evaluator"}, + {"source_id": "Evaluator", "target_id": "Scheduler"}, + {"source_id": "Scheduler", "target_id": "end"}, + ], + ) + + spans = [ + _build_span( + "2026-03-05T10:00:00Z", + "workflow.build", + { + "workflow.id": nested_id, + "workflow_builder.name": "NestedWorkflow", + "workflow.definition": nested_def, + }, + ), + _build_span( + "2026-03-05T10:00:01Z", + "workflow.build", + { + "workflow.id": root_id, + "workflow_builder.name": "RootWorkflow", + "workflow.definition": root_def, + }, + ), + _build_span( + "2026-03-05T10:00:02Z", + "workflow.run", + { + "workflow.id": root_id, + "workflow.name": "RootWorkflow", + }, + ), + ] + + result = convert_workflow_traces(spans) + + assert result["workflow_id"] == root_id + assert result["workflow_name"] == "RootWorkflow" + assert result["topology"]["start_executor_id"] == "input-conversation" + + def test_falls_back_to_latest_parseable_build_when_run_id_missing(self): + spans = [ + _build_span( + "2026-03-05T10:00:00Z", + "workflow.build", + { + "workflow.id": "first", + "workflow_builder.name": "FirstWorkflow", + "workflow.definition": "{invalid-json", + }, + ), + _build_span( + "2026-03-05T10:00:01Z", + "workflow.build", + { + "workflow.id": "second", + "workflow_builder.name": "SecondWorkflow", + "workflow.definition": _workflow_definition( + workflow_id="second", + workflow_name="SecondWorkflow", + start_executor_id="start", + executors={ + "start": {"id": "start", "type": "_InputToConversation"}, + "end": {"id": "end", "type": "_EndWithConversation"}, + }, + edges=[{"source_id": "start", "target_id": "end"}], + ), + }, + ), + _build_span( + "2026-03-05T10:00:02Z", + "workflow.run", + { + "workflow.id": "unmatched", + "workflow.name": "Unmatched", + }, + ), + ] + + result = convert_workflow_traces(spans) + + assert result["workflow_id"] == "second" + assert result["workflow_name"] == "SecondWorkflow" + assert result["topology"]["start_executor_id"] == "start" + + def test_output_contract_stays_stable(self): + spans = [ + _build_span( + "2026-03-05T10:00:00Z", + "workflow.build", + { + "workflow.id": "wf", + "workflow_builder.name": "Workflow", + "workflow.definition": _workflow_definition( + workflow_id="wf", + workflow_name="Workflow", + start_executor_id="start", + executors={ + "start": {"id": "start", "type": "_InputToConversation"}, + "end": {"id": "end", "type": "_EndWithConversation"}, + }, + edges=[{"source_id": "start", "target_id": "end"}], + ), + }, + ), + _build_span( + "2026-03-05T10:00:01Z", + "workflow.run", + { + "workflow.id": "wf", + "workflow.name": "Workflow", + }, + ), + ] + + result = convert_workflow_traces(spans) + + assert set(result.keys()) == { + "parse_failed", + "workflow_id", + "workflow_name", + "topology", + "agents", + "invocations", + "errors", + } + assert result["parse_failed"] is False