Skip to content

Commit 25addc1

Browse files
authored
ref(consumers): Allow injecting StreamProcessor kwargs using options (#104244)
We have had situations where applying processor arguments using CLI arguments was error prone and took a long time to revert if something is wrong. Let's use dynamic cli flags instead. currently you have to: * add a feature to arroyo * add a cli flag to sentry * add cli in k8s then the flag cleanup is all of that in reverse with this pr you at least only have to bump arroyo in sentry, and there's nothing to be done in sentry on cleanup
1 parent d76ed7f commit 25addc1

File tree

3 files changed

+152
-9
lines changed

3 files changed

+152
-9
lines changed

src/sentry/consumers/__init__.py

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import uuid
55
from collections.abc import Mapping, Sequence
6-
from typing import Any
6+
from typing import Any, Union, get_args, get_origin, get_type_hints
77

88
import click
99
from arroyo.backends.abstract import Consumer
@@ -27,12 +27,79 @@
2727
from sentry.consumers.validate_schema import ValidateSchema
2828
from sentry.eventstream.types import EventStreamEventType
2929
from sentry.ingest.types import ConsumerType
30+
from sentry.utils import json
3031
from sentry.utils.imports import import_string
3132
from sentry.utils.kafka_config import get_topic_definition
3233

3334
logger = logging.getLogger(__name__)
3435

3536

37+
def apply_processor_args_overrides(
38+
consumer_name: str, base_args: dict[str, Any], overrides: Sequence[str]
39+
) -> dict[str, Any]:
40+
"""
41+
Apply processor args overrides from CLI strings to the base StreamProcessor arguments.
42+
43+
Args:
44+
consumer_name: Name of the consumer
45+
base_args: Base arguments dict for StreamProcessor
46+
overrides: Raw CLI argument strings in format 'key:value'
47+
48+
Returns:
49+
Updated arguments dict with overrides applied
50+
"""
51+
# Get resolved type hints (handles __future__.annotations)
52+
type_hints = get_type_hints(StreamProcessor.__init__)
53+
54+
for arg in overrides:
55+
try:
56+
key, value_str = arg.split(":", 1)
57+
param_type = type_hints[key]
58+
59+
# Extract the actual type from Optional[T] (which is Union[T, None])
60+
if get_origin(param_type) is Union:
61+
# For Optional[T] (Union[T, None]), extract T
62+
type_args = get_args(param_type)
63+
param_type = next((t for t in type_args if t is not type(None)), str)
64+
65+
# Try to parse as JSON first, fallback to string
66+
try:
67+
value = json.loads(value_str)
68+
except Exception:
69+
value = value_str
70+
71+
# Validate the type matches what we expect
72+
# Allow int->float coercion since JSON parses as int
73+
if param_type == float and isinstance(value, int):
74+
value = float(value)
75+
elif not isinstance(value, param_type):
76+
raise ValueError(f"Expected {param_type}, got {type(value)}")
77+
78+
if key in base_args:
79+
logger.info(
80+
"overriding argument %s from CLI: %s -> %s",
81+
key,
82+
base_args[key],
83+
value,
84+
extra={
85+
"consumer_name": consumer_name,
86+
},
87+
)
88+
base_args[key] = value
89+
except Exception as e:
90+
logger.warning(
91+
"skipping invalid argument %s from CLI: %s",
92+
arg,
93+
str(e),
94+
extra={
95+
"consumer_name": consumer_name,
96+
"valid_params": sorted(type_hints.keys()),
97+
},
98+
)
99+
100+
return base_args
101+
102+
36103
def convert_max_batch_time(ctx, param, value):
37104
if value <= 0:
38105
raise click.BadParameter("--max-batch-time must be greater than 0")
@@ -467,6 +534,7 @@ def get_stream_processor(
467534
kafka_slice_id: int | None = None,
468535
add_global_tags: bool = False,
469536
profile_consumer_join: bool = False,
537+
arroyo_args: Sequence[str] | None = None,
470538
) -> StreamProcessor:
471539
from sentry.utils import kafka_config
472540

@@ -623,14 +691,21 @@ def build_consumer_config(group_id: str):
623691
else:
624692
dlq_policy = None
625693

626-
return StreamProcessor(
627-
consumer=consumer,
628-
topic=ArroyoTopic(topic),
629-
processor_factory=strategy_factory,
630-
commit_policy=ONCE_PER_SECOND,
631-
join_timeout=join_timeout,
632-
dlq_policy=dlq_policy,
633-
)
694+
# Build base StreamProcessor arguments
695+
processor_args = {
696+
"consumer": consumer,
697+
"topic": ArroyoTopic(topic),
698+
"processor_factory": strategy_factory,
699+
"commit_policy": ONCE_PER_SECOND,
700+
"join_timeout": join_timeout,
701+
"dlq_policy": dlq_policy,
702+
}
703+
704+
# Apply CLI-provided overrides for StreamProcessor arguments
705+
if arroyo_args:
706+
processor_args = apply_processor_args_overrides(consumer_name, processor_args, arroyo_args)
707+
708+
return StreamProcessor(**processor_args)
634709

635710

636711
class ValidateSchemaStrategyFactoryWrapper(ProcessingStrategyFactory):

src/sentry/runner/commands/run.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,13 +544,20 @@ def taskbroker_send_tasks(
544544
default=False,
545545
help="Adds a ProcessingStrategy to the start of a consumer that records a transaction of the consumer's join() method.",
546546
)
547+
@click.option(
548+
"--arroyo-arg",
549+
"arroyo_args",
550+
multiple=True,
551+
help="Override StreamProcessor arguments. Format: --arroyo-arg='key:value'. Example: --arroyo-arg='join_timeout:60'",
552+
)
547553
@configuration
548554
def basic_consumer(
549555
consumer_name: str,
550556
consumer_args: tuple[str, ...],
551557
topic: str | None,
552558
kafka_slice_id: int | None,
553559
quantized_rebalance_delay_secs: int | None,
560+
arroyo_args: tuple[str, ...],
554561
**options: Any,
555562
) -> None:
556563
"""
@@ -594,6 +601,7 @@ def basic_consumer(
594601
topic=topic,
595602
kafka_slice_id=kafka_slice_id,
596603
add_global_tags=True,
604+
arroyo_args=arroyo_args,
597605
**options,
598606
)
599607

tests/sentry/consumers/test_run.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from unittest.mock import patch
2+
13
import pytest
24
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
35

@@ -58,3 +60,61 @@ def test_dlq(consumer_def) -> None:
5860
+ consumers_that_should_have_dlq_but_dont
5961
):
6062
assert defn.get("dlq_topic") is not None, f"{consumer_name} consumer is missing DLQ"
63+
64+
65+
def test_apply_processor_args_overrides() -> None:
66+
"""Test the apply_processor_args_overrides function."""
67+
from sentry.consumers import apply_processor_args_overrides
68+
69+
# Test with CLI string overrides
70+
result = apply_processor_args_overrides(
71+
"ingest-monitors",
72+
{"join_timeout": 10.0, "consumer": "mock_consumer", "topic": "mock_topic"},
73+
("join_timeout:123", "stuck_detector_timeout:456"),
74+
)
75+
assert result["join_timeout"] == 123
76+
assert result["stuck_detector_timeout"] == 456
77+
assert result["consumer"] == "mock_consumer"
78+
79+
# Test with empty overrides
80+
result = apply_processor_args_overrides("ingest-monitors", {"join_timeout": 10.0}, ())
81+
assert result["join_timeout"] == 10.0
82+
83+
# Test logging when overriding existing arg
84+
with patch("sentry.consumers.logger") as mock_logger:
85+
result = apply_processor_args_overrides(
86+
"ingest-monitors",
87+
{"join_timeout": 10.0},
88+
("join_timeout:999",),
89+
)
90+
assert result["join_timeout"] == 999
91+
mock_logger.info.assert_called_once()
92+
call_args = mock_logger.info.call_args
93+
assert call_args[0][0] == "overriding argument %s from CLI: %s -> %s"
94+
assert call_args[0][1] == "join_timeout"
95+
assert call_args[0][2] == 10.0
96+
assert call_args[0][3] == 999
97+
98+
# Test no logging when adding new arg
99+
with patch("sentry.consumers.logger") as mock_logger:
100+
result = apply_processor_args_overrides(
101+
"ingest-monitors",
102+
{"join_timeout": 10.0},
103+
("stuck_detector_timeout:456",),
104+
)
105+
assert result["stuck_detector_timeout"] == 456
106+
mock_logger.info.assert_not_called()
107+
108+
# Test skipping invalid parameters and logging warning
109+
with patch("sentry.consumers.logger") as mock_logger:
110+
result = apply_processor_args_overrides(
111+
"ingest-monitors",
112+
{"join_timeout": 10.0},
113+
("invalid_param:789", "join_timeout:999"),
114+
)
115+
assert result["join_timeout"] == 999
116+
assert "invalid_param" not in result
117+
mock_logger.warning.assert_called_once()
118+
call_args = mock_logger.warning.call_args
119+
assert call_args[0][0] == "skipping invalid argument %s from CLI: %s"
120+
assert call_args[0][1] == "invalid_param:789"

0 commit comments

Comments
 (0)