Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions snuba/querylog/query_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Dict, MutableSequence, Optional, Set, cast
from typing import Any, Dict, Mapping, MutableSequence, Optional, Set, cast

from clickhouse_driver.errors import ErrorCodes
from sentry_kafka_schemas.schema_types import snuba_queries_v1
Expand Down Expand Up @@ -125,7 +125,9 @@ def slo(self) -> SLO:
}


def get_request_status(cause: Exception | None = None) -> Status:
def get_request_status(
cause: Exception | None = None, context: Optional[Mapping[str, Any]] = None
) -> Status:
slo_status: RequestStatus
if cause is None:
slo_status = RequestStatus.SUCCESS
Expand All @@ -136,7 +138,15 @@ def get_request_status(cause: Exception | None = None) -> Status:
else:
slo_status = RequestStatus.RATE_LIMITED
elif isinstance(cause, ClickhouseError):
slo_status = ERROR_CODE_MAPPINGS.get(cause.code, RequestStatus.ERROR)
# Check if TOO_MANY_BYTES was from allocation policy
if (
cause.code == ErrorCodes.TOO_MANY_BYTES
and context
and context.get("max_bytes_to_read_set_by_policy", False)
):
slo_status = RequestStatus.RATE_LIMITED
else:
slo_status = ERROR_CODE_MAPPINGS.get(cause.code, RequestStatus.ERROR)
elif isinstance(cause, TimeoutError):
slo_status = RequestStatus.QUERY_TIMEOUT
elif isinstance(cause, ExecutionTimeoutError):
Expand Down
15 changes: 10 additions & 5 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def _raw_query(
error_code = None
trigger_rate_limiter = None
status = None
request_status = get_request_status(cause)
request_status = get_request_status(cause, context=stats)

calculated_cause = cause
if isinstance(cause, RateLimitExceeded):
Expand All @@ -477,10 +477,13 @@ def _raw_query(
error_code = cause.code
status = get_query_status_from_error_codes(error_code)
if error_code == ErrorCodes.TOO_MANY_BYTES:
calculated_cause = RateLimitExceeded(
"Query scanned more than the allocated amount of bytes",
quota_allowance=stats["quota_allowance"],
)
# Only treat as rate limiting if the limit was set by allocation policy
if stats.get("max_bytes_to_read_set_by_policy", False):
calculated_cause = RateLimitExceeded(
"Query scanned more than the allocated amount of bytes",
quota_allowance=stats["quota_allowance"],
)
status = QueryStatus.RATE_LIMITED

with configure_scope() as scope:
fingerprint = ["{{default}}", str(cause.code), dataset_name]
Expand Down Expand Up @@ -857,6 +860,8 @@ def _apply_allocation_policies_quota(
if max_bytes_to_read != 0:
query_settings.push_clickhouse_setting("max_bytes_to_read", max_bytes_to_read)
summary["max_bytes_to_read"] = max_bytes_to_read
# Track that max_bytes_to_read was set by allocation policy
stats["max_bytes_to_read_set_by_policy"] = True

_populate_query_status(summary, rejection_quota_and_policy, throttle_quota_and_policy)
_add_quota_info(summary, _REJECTED_BY, rejection_quota_and_policy)
Expand Down
1 change: 1 addition & 0 deletions tests/querylog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Test module for querylog
82 changes: 82 additions & 0 deletions tests/querylog/test_query_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Unit tests for query_metadata module"""

from typing import Any, Dict

from clickhouse_driver.errors import ErrorCodes

from snuba.clickhouse.errors import ClickhouseError
from snuba.querylog.query_metadata import (
SLO,
RequestStatus,
get_request_status,
)
from snuba.state.rate_limit import RateLimitExceeded


class TestGetRequestStatus:
"""Tests for get_request_status function"""

def test_get_request_status_success(self) -> None:
"""Test that no exception returns SUCCESS status"""
status = get_request_status(None)
assert status.status == RequestStatus.SUCCESS
assert status.slo == SLO.FOR

def test_get_request_status_rate_limit_exceeded(self) -> None:
"""Test that RateLimitExceeded returns RATE_LIMITED status"""
error = RateLimitExceeded("Rate limit exceeded")
status = get_request_status(error)
assert status.status == RequestStatus.RATE_LIMITED
assert status.slo == SLO.FOR

def test_get_request_status_clickhouse_error_generic(self) -> None:
"""Test that generic ClickhouseError returns ERROR status"""
error = ClickhouseError("Generic error", code=999)
status = get_request_status(error)
assert status.status == RequestStatus.ERROR
assert status.slo == SLO.AGAINST

def test_get_request_status_too_many_bytes_with_policy(self) -> None:
"""Test that TOO_MANY_BYTES with allocation policy flag returns RATE_LIMITED"""
error = ClickhouseError("Too many bytes", code=ErrorCodes.TOO_MANY_BYTES)
context: Dict[str, Any] = {"max_bytes_to_read_set_by_policy": True}
status = get_request_status(error, context)
assert status.status == RequestStatus.RATE_LIMITED
assert status.slo == SLO.FOR

def test_get_request_status_too_many_bytes_without_policy(self) -> None:
"""Test that TOO_MANY_BYTES without allocation policy flag returns ERROR"""
error = ClickhouseError("Too many bytes", code=ErrorCodes.TOO_MANY_BYTES)
context: Dict[str, Any] = {"max_bytes_to_read_set_by_policy": False}
status = get_request_status(error, context)
assert status.status == RequestStatus.ERROR
assert status.slo == SLO.AGAINST

def test_get_request_status_too_many_bytes_no_context(self) -> None:
"""Test that TOO_MANY_BYTES without context returns ERROR"""
error = ClickhouseError("Too many bytes", code=ErrorCodes.TOO_MANY_BYTES)
status = get_request_status(error)
assert status.status == RequestStatus.ERROR
assert status.slo == SLO.AGAINST

def test_get_request_status_too_many_bytes_empty_context(self) -> None:
"""Test that TOO_MANY_BYTES with empty context returns ERROR"""
error = ClickhouseError("Too many bytes", code=ErrorCodes.TOO_MANY_BYTES)
context: Dict[str, Any] = {}
status = get_request_status(error, context)
assert status.status == RequestStatus.ERROR
assert status.slo == SLO.AGAINST

def test_get_request_status_memory_exceeded(self) -> None:
"""Test that MEMORY_LIMIT_EXCEEDED is mapped correctly"""
error = ClickhouseError("Memory exceeded", code=ErrorCodes.MEMORY_LIMIT_EXCEEDED)
status = get_request_status(error)
assert status.status == RequestStatus.MEMORY_EXCEEDED
assert status.slo == SLO.FOR

def test_get_request_status_timeout(self) -> None:
"""Test that timeout errors are mapped correctly"""
error = ClickhouseError("Timeout", code=ErrorCodes.TIMEOUT_EXCEEDED)
status = get_request_status(error)
assert status.status == RequestStatus.CLICKHOUSE_TIMEOUT
assert status.slo == SLO.AGAINST
72 changes: 72 additions & 0 deletions tests/test_api_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,75 @@ def test_correct_error_codes(self, execute_mock: MagicMock, record_query: MagicM
assert metadata["slo"] == slo, exception
execute_mock.reset_mock()
record_query.reset_mock()

@patch("snuba.settings.RECORD_QUERIES", True)
@patch("snuba.state.record_query")
@patch("snuba.web.db_query.execute_query")
@patch("snuba.web.db_query._apply_allocation_policies_quota")
@pytest.mark.events_db
@pytest.mark.redis_db
def test_too_many_bytes_from_allocation_policy(
self, apply_policies_mock: MagicMock, execute_mock: MagicMock, record_query: MagicMock
) -> None:
"""Test that TOO_MANY_BYTES from allocation policy is classified as RATE_LIMITED in SLO metrics"""

# Mock allocation policy to set the flag
def set_policy_flag(
query_settings: Any,
attribution_info: Any,
formatted_query: Any,
stats: Any,
*args: Any,
**kwargs: Any,
) -> None:
# Set the flag in the stats dict (which is passed by reference)
stats["max_bytes_to_read_set_by_policy"] = True
stats["quota_allowance"] = {}

apply_policies_mock.side_effect = set_policy_flag

# Mock execute_query to raise TOO_MANY_BYTES error
execute_mock.side_effect = ClickhouseError("Too many bytes", code=ErrorCodes.TOO_MANY_BYTES)

response = self.post()

# Verify the response has rate-limited type (because calculated_cause becomes RateLimitExceeded)
data = json.loads(response.data)
assert data["error"]["type"] == "rate-limited", (
f"Expected rate-limited, got {data['error']['type']}"
)

# Verify SLO metrics - this is the critical part for the fix
metadata = record_query.call_args[0][0]
assert metadata["request_status"] == "rate-limited", (
"TOO_MANY_BYTES from policy should be RATE_LIMITED"
)
assert metadata["slo"] == "for", "Rate limited requests should not count against SLO"

@patch("snuba.settings.RECORD_QUERIES", True)
@patch("snuba.state.record_query")
@patch("snuba.web.db_query.execute_query")
@pytest.mark.events_db
@pytest.mark.redis_db
def test_too_many_bytes_without_allocation_policy(
self, execute_mock: MagicMock, record_query: MagicMock
) -> None:
"""Test that TOO_MANY_BYTES without allocation policy is classified as ERROR in SLO metrics"""
# Mock execute_query to raise TOO_MANY_BYTES error
# No allocation policy mock, so max_bytes_to_read_set_by_policy will not be set
execute_mock.side_effect = ClickhouseError("Too many bytes", code=ErrorCodes.TOO_MANY_BYTES)

response = self.post()

# Verify the response has clickhouse error type (because calculated_cause stays as ClickhouseError)
data = json.loads(response.data)
assert data["error"]["type"] == "clickhouse", (
f"Expected clickhouse, got {data['error']['type']}"
)

# Verify SLO metrics - this is the critical part for the fix
metadata = record_query.call_args[0][0]
assert metadata["request_status"] == "error", (
"TOO_MANY_BYTES without policy should be ERROR"
)
assert metadata["slo"] == "against", "Error requests should count against SLO"
Loading