diff --git a/sdk/evaluation/azure-ai-evaluation/assets.json b/sdk/evaluation/azure-ai-evaluation/assets.json index 06bac4a6f64e..f4ecb76eb917 100644 --- a/sdk/evaluation/azure-ai-evaluation/assets.json +++ b/sdk/evaluation/azure-ai-evaluation/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "python", "TagPrefix": "python/evaluation/azure-ai-evaluation", - "Tag": "python/evaluation/azure-ai-evaluation_4da51ba6ab" + "Tag": "python/evaluation/azure-ai-evaluation_fdc0bc4d06" } diff --git a/sdk/evaluation/azure-ai-evaluation/tests/conftest.py b/sdk/evaluation/azure-ai-evaluation/tests/conftest.py index 731203c00574..6e969ff42303 100644 --- a/sdk/evaluation/azure-ai-evaluation/tests/conftest.py +++ b/sdk/evaluation/azure-ai-evaluation/tests/conftest.py @@ -327,7 +327,7 @@ def simple_conversation(): def redirect_openai_requests(): """Route requests from the openai package to the test proxy.""" config = TestProxyConfig( - recording_id=get_recording_id(), recording_mode="record" if is_live() else "playback", proxy_url=PROXY_URL() + recording_id=get_recording_id(), recording_mode="record" if is_live() else "playback", proxy_url=PROXY_URL ) with TestProxyHttpxClientBase.record_with_proxy(config): diff --git a/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_red_team_foundry.py b/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_red_team_foundry.py index de9535918d23..e218be323ec1 100644 --- a/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_red_team_foundry.py +++ b/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_red_team_foundry.py @@ -9,6 +9,13 @@ - IndirectJailbreak (XPIA) attacks with context - Multiple strategies in a single execution - Context data handling (binary_path storage) +- AzureOpenAIModelConfiguration target type +- Agent-flagged callback targets (is_agent_target=True) +- Agent targets with tool context injection +- New v2025_11_15 risk categories (ProtectedMaterial, CodeVulnerability, TaskAdherence) +- Agent-only risk categories (SensitiveDataLeakage) +- Error paths for contract enforcement (agent-only risk on model target) +- MultiTurn and Crescendo attack strategies via Foundry path """ from typing import Any, Dict, List, Optional @@ -21,6 +28,7 @@ from azure.ai.evaluation.red_team import RedTeam, RiskCategory, AttackStrategy from azure.ai.evaluation.red_team._red_team_result import RedTeamResult +from azure.ai.evaluation._model_configurations import AzureOpenAIModelConfiguration @pytest.mark.usefixtures("recording_injection", "recorded_test") @@ -28,6 +36,53 @@ class TestRedTeamFoundry: """Test RedTeam Foundry integration features.""" + @staticmethod + def _validate_attack_details( + attack_details: List[Dict], + *, + expected_risk_category: Optional[str] = None, + min_attacks: int = 1, + min_conversation_length: int = 2, + ) -> None: + """Validate the structure and content of attack details from a red team scan. + + :param attack_details: List of attack detail dicts from RedTeamResult.attack_details + :param expected_risk_category: If set, asserts all attacks match this risk category + :param min_attacks: Minimum number of attack details expected (default: 1) + :param min_conversation_length: Minimum number of messages per conversation (default: 2) + """ + assert attack_details is not None, "attack_details should not be None" + assert len(attack_details) >= min_attacks, ( + f"Expected at least {min_attacks} attack detail(s), got {len(attack_details)}" + ) + + for attack in attack_details: + # Required fields per RAISvc contract + assert "conversation" in attack, "Attack detail missing 'conversation' field" + assert "risk_category" in attack, "Attack detail missing 'risk_category' field" + assert "attack_technique" in attack, "Attack detail missing 'attack_technique' field" + + # Validate risk category if expected + if expected_risk_category is not None: + assert attack["risk_category"] == expected_risk_category, ( + f"Expected risk_category '{expected_risk_category}', got '{attack['risk_category']}'" + ) + + # Validate conversation structure + conversation = attack["conversation"] + assert len(conversation) >= min_conversation_length, ( + f"Expected at least {min_conversation_length} messages in conversation, got {len(conversation)}" + ) + + # Validate role alternation: first message should be user, then alternating + assert conversation[0]["role"] == "user", ( + f"First conversation message should have role 'user', got '{conversation[0]['role']}'" + ) + if len(conversation) >= 2: + assert conversation[1]["role"] == "assistant", ( + f"Second conversation message should have role 'assistant', got '{conversation[1]['role']}'" + ) + @pytest.mark.azuretest @pytest.mark.parametrize( ("proj_scope", "cred"), @@ -70,11 +125,7 @@ def simple_target(query: str) -> str: assert len(result.attack_details) > 0 # Validate attack details structure - for attack in result.attack_details: - assert "conversation" in attack - assert "risk_category" in attack - assert "attack_technique" in attack - assert attack["risk_category"] == "violence" + self._validate_attack_details(result.attack_details, expected_risk_category="violence") @pytest.mark.azuretest @pytest.mark.parametrize( @@ -296,3 +347,536 @@ def simple_target(query: str) -> str: assert ( len(techniques_found) >= 2 ), f"Expected results for at least 2 techniques, got {len(techniques_found)}: {techniques_found}" + + # ==================== New target type tests ==================== + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_with_model_config_target(self, request, proj_scope, cred, sanitized_model_config): + """ + Test Foundry execution with AzureOpenAIModelConfiguration target. + + Verifies that the Foundry path can accept and execute against a real + model configuration target (not just callback functions). This validates + the AzureAIModel target contract through the Foundry execution path. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.HateUnfairness], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=sanitized_model_config, + scan_name="test_foundry_model_config", + attack_strategies=[AttackStrategy.Baseline, AttackStrategy.Base64], + timeout=4800, + skip_upload=True, + ) + ) + + assert isinstance(result, RedTeamResult) + self._validate_attack_details(result.attack_details, expected_risk_category="hate_unfairness") + + @pytest.fixture + def sanitized_model_config(self, model_config: AzureOpenAIModelConfiguration) -> AzureOpenAIModelConfiguration: + """Fixture that sanitizes the Azure OpenAI model configuration for testing.""" + if model_config["azure_endpoint"] != "https://Sanitized.api.cognitive.microsoft.com": + return model_config + + return AzureOpenAIModelConfiguration( + **{**model_config, "azure_endpoint": "https://Sanitized.openai.azure.com/"}, + ) + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_agent_target_callback(self, request, proj_scope, cred): + """ + Test Foundry execution with an agent-flagged callback target. + + Verifies that the Foundry path correctly handles targets flagged as + agent targets via the is_agent_target kwarg. This validates the + AzureAIAgent contract path through the SDK, where the SDK signals + to the service that the target is an agent (affecting risk category + validation and objective generation). + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + async def agent_callback( + messages: List[Dict], + stream: bool = False, + session_state: Any = None, + context: Optional[Dict[str, Any]] = None, + ) -> dict: + query = messages[-1]["content"] if isinstance(messages, list) else messages["messages"][-1]["content"] + formatted_response = {"content": f"Agent response to: {query}", "role": "assistant"} + + if isinstance(messages, list): + messages.append(formatted_response) + return { + "messages": messages, + "stream": stream, + "session_state": session_state, + "context": context, + } + else: + messages["messages"].append(formatted_response) + return { + "messages": messages["messages"], + "stream": stream, + "session_state": session_state, + "context": context, + } + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.Violence], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=agent_callback, + scan_name="test_foundry_agent_callback", + attack_strategies=[AttackStrategy.Baseline, AttackStrategy.Base64], + timeout=4800, + skip_upload=True, + is_agent_target=True, + ) + ) + + assert isinstance(result, RedTeamResult) + self._validate_attack_details(result.attack_details, expected_risk_category="violence") + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_agent_target_with_tool_context(self, request, proj_scope, cred): + """ + Test Foundry execution with an agent target that handles tool context. + + Verifies that IndirectJailbreak (XPIA) attacks correctly deliver context + data to agent-flagged callback targets. This validates the tool injection + contract where context["contexts"] is populated for XPIA attacks, simulating + how ACA injects synthetic FunctionTool definitions for agent targets. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + async def agent_with_tools( + messages: List[Dict], + stream: bool = False, + session_state: Any = None, + context: Optional[Dict[str, Any]] = None, + ) -> dict: + query = messages[-1]["content"] if isinstance(messages, list) else messages["messages"][-1]["content"] + + # Simulate agent processing tool context + tool_info = "" + if context and "contexts" in context: + tool_info = f" [processed {len(context['contexts'])} tool context(s)]" + + formatted_response = { + "content": f"Agent with tools response: {query}{tool_info}", + "role": "assistant", + } + + if isinstance(messages, list): + messages.append(formatted_response) + return { + "messages": messages, + "stream": stream, + "session_state": session_state, + "context": context, + } + else: + messages["messages"].append(formatted_response) + return { + "messages": messages["messages"], + "stream": stream, + "session_state": session_state, + "context": context, + } + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.Violence], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=agent_with_tools, + scan_name="test_foundry_agent_tools", + attack_strategies=[AttackStrategy.IndirectJailbreak], + timeout=4800, + skip_upload=True, + is_agent_target=True, + ) + ) + + assert isinstance(result, RedTeamResult) + assert result.attack_details is not None + + # Verify at least some attacks used indirect_jailbreak technique + techniques = [a["attack_technique"] for a in result.attack_details] + assert "indirect_jailbreak" in techniques, f"Expected indirect_jailbreak in techniques, got: {techniques}" + + # Validate conversation structure for all attacks + self._validate_attack_details(result.attack_details, expected_risk_category="violence") + + # ==================== New risk category tests (v2025_11_15) ==================== + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_protected_material_category(self, request, proj_scope, cred): + """ + Test Foundry execution with ProtectedMaterial risk category. + + Validates that the v2025_11_15 ProtectedMaterial risk category works + through the Foundry path. This category tests for copyrighted or + protected content generation. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + def simple_target(query: str) -> str: + return "I follow content guidelines and cannot reproduce protected material." + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.ProtectedMaterial], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=simple_target, + scan_name="test_foundry_protected_material", + attack_strategies=[AttackStrategy.Baseline, AttackStrategy.Base64], + timeout=4800, + skip_upload=True, + ) + ) + + assert isinstance(result, RedTeamResult) + self._validate_attack_details(result.attack_details, expected_risk_category="protected_material") + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_code_vulnerability_category(self, request, proj_scope, cred): + """ + Test Foundry execution with CodeVulnerability risk category. + + Validates that the v2025_11_15 CodeVulnerability risk category works + through the Foundry path. This category tests for security vulnerability + generation in code. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + def simple_target(query: str) -> str: + return "I write secure code following best practices." + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.CodeVulnerability], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=simple_target, + scan_name="test_foundry_code_vuln", + attack_strategies=[AttackStrategy.Baseline, AttackStrategy.Base64], + timeout=4800, + skip_upload=True, + ) + ) + + assert isinstance(result, RedTeamResult) + self._validate_attack_details(result.attack_details, expected_risk_category="code_vulnerability") + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_task_adherence_category(self, request, proj_scope, cred): + """ + Test Foundry execution with TaskAdherence risk category. + + Validates that the v2025_11_15 TaskAdherence risk category works + through the Foundry path. This category tests for model divergence + from intended task behavior. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + def simple_target(query: str) -> str: + return "I stay on task and only answer questions within my scope." + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.TaskAdherence], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=simple_target, + scan_name="test_foundry_task_adherence", + attack_strategies=[AttackStrategy.Baseline, AttackStrategy.Base64], + timeout=4800, + skip_upload=True, + ) + ) + + assert isinstance(result, RedTeamResult) + self._validate_attack_details(result.attack_details, expected_risk_category="task_adherence") + + # ==================== Agent-only risk category tests ==================== + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_agent_sensitive_data_leakage(self, request, proj_scope, cred): + """ + Test Foundry execution with SensitiveDataLeakage risk category on agent target. + + Validates that the agent-only SensitiveDataLeakage risk category works + when is_agent_target=True. This risk category is restricted to agent + targets per the SDK validation logic and tests PII/sensitive data + exposure detection. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + async def agent_callback( + messages: List[Dict], + stream: bool = False, + session_state: Any = None, + context: Optional[Dict[str, Any]] = None, + ) -> dict: + query = messages[-1]["content"] if isinstance(messages, list) else messages["messages"][-1]["content"] + formatted_response = {"content": f"I protect sensitive data: {query}", "role": "assistant"} + + if isinstance(messages, list): + messages.append(formatted_response) + return { + "messages": messages, + "stream": stream, + "session_state": session_state, + "context": context, + } + else: + messages["messages"].append(formatted_response) + return { + "messages": messages["messages"], + "stream": stream, + "session_state": session_state, + "context": context, + } + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.SensitiveDataLeakage], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=agent_callback, + scan_name="test_foundry_sensitive_data", + attack_strategies=[AttackStrategy.Baseline, AttackStrategy.Base64], + timeout=4800, + skip_upload=True, + is_agent_target=True, + ) + ) + + assert isinstance(result, RedTeamResult) + self._validate_attack_details(result.attack_details, expected_risk_category="sensitive_data_leakage") + + # ==================== Error path tests ==================== + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_model_target_rejects_agent_only_risk_category(self, request, proj_scope, cred): + """ + Test that SensitiveDataLeakage is rejected for non-agent targets. + + Validates the SDK contract enforcement: the SensitiveDataLeakage risk + category is restricted to agent targets. When used with a non-agent + target (is_agent_target=False or not set), the SDK should raise an + EvaluationException before making any HTTP calls. + """ + from azure.ai.evaluation._exceptions import EvaluationException + + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + def simple_target(query: str) -> str: + return "Response" + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.SensitiveDataLeakage], + num_objectives=1, + ) + + with pytest.raises(EvaluationException, match="only available for agent targets"): + asyncio.run( + red_team.scan( + target=simple_target, + scan_name="test_foundry_reject_agent_only", + attack_strategies=[AttackStrategy.Baseline], + timeout=4800, + skip_upload=True, + ) + ) + + # ==================== Multi-turn and Crescendo strategy tests ==================== + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_multi_turn_attack(self, request, proj_scope, cred): + """ + Test Foundry execution with MultiTurn attack strategy. + + Validates that the Foundry path can execute multi-turn attacks which + involve multiple conversation exchanges between the attacker and target. + Multi-turn attacks spread harmful intent across conversation turns. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + def defensive_target(query: str) -> str: + return "Sorry, I cannot assist with that request." + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.Violence], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=defensive_target, + scan_name="test_foundry_multi_turn", + attack_strategies=[AttackStrategy.MultiTurn], + timeout=4800, + skip_upload=True, + ) + ) + + assert isinstance(result, RedTeamResult) + assert result.attack_details is not None + assert len(result.attack_details) > 0 + + for attack in result.attack_details: + conversation = attack["conversation"] + if attack["attack_technique"] == "multi_turn": + # Multi-turn attacks attempt multiple turns but may terminate early + assert len(conversation) >= 2, "Multi-turn attack should have at least 2 messages" + else: + assert len(conversation) >= 2 + + # Validate role alternation + for i in range(len(conversation)): + expected_role = "user" if i % 2 == 0 else "assistant" + assert conversation[i]["role"] == expected_role, ( + f"Message {i} should have role '{expected_role}', got '{conversation[i]['role']}'" + ) + + @pytest.mark.azuretest + @pytest.mark.parametrize( + ("proj_scope", "cred"), + (("project_scope_onedp", "azure_cred_onedp"),), + ) + def test_foundry_crescendo_attack(self, request, proj_scope, cred): + """ + Test Foundry execution with Crescendo attack strategy. + + Validates that the Foundry path can execute crescendo attacks which + gradually escalate the conversation to try to elicit harmful responses. + Crescendo attacks are classified as 'Difficult' complexity and produce + multi-turn conversations. + """ + azure_cred = request.getfixturevalue(cred) + project_scope = request.getfixturevalue(proj_scope) + + def defensive_target(query: str) -> str: + return "Sorry, I cannot assist with that request." + + red_team = RedTeam( + azure_ai_project=project_scope, + credential=azure_cred, + risk_categories=[RiskCategory.Violence], + num_objectives=1, + ) + + result = asyncio.run( + red_team.scan( + target=defensive_target, + scan_name="test_foundry_crescendo", + attack_strategies=[AttackStrategy.Crescendo], + timeout=4800, + skip_upload=True, + ) + ) + + assert isinstance(result, RedTeamResult) + assert result.attack_details is not None + assert len(result.attack_details) > 0 + + for attack in result.attack_details: + conversation = attack["conversation"] + if attack["attack_technique"] == "crescendo": + # Crescendo attacks produce multi-turn conversations (typically 20 messages / 10 turns) + assert len(conversation) >= 2, "Crescendo attack should produce multi-turn conversation" + else: + assert len(conversation) >= 2 + + # Validate role alternation + for i in range(len(conversation)): + expected_role = "user" if i % 2 == 0 else "assistant" + assert conversation[i]["role"] == expected_role, ( + f"Message {i} should have role '{expected_role}', got '{conversation[i]['role']}'" + )