Skip to content

Commit eda922e

Browse files
authored
Use new scopes API in Celery integration. (#2851)
Use new scopes API in Celery integration.
1 parent a40f128 commit eda922e

File tree

2 files changed

+51
-58
lines changed

2 files changed

+51
-58
lines changed

sentry_sdk/integrations/celery.py

Lines changed: 46 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
import time
33
from functools import wraps
44

5+
import sentry_sdk
56
from sentry_sdk.api import continue_trace
67
from sentry_sdk.consts import OP
78
from sentry_sdk.crons import capture_checkin, MonitorStatus
8-
from sentry_sdk.hub import Hub
99
from sentry_sdk import isolation_scope
1010
from sentry_sdk.integrations import Integration, DidNotEnable
1111
from sentry_sdk.integrations.logging import ignore_logger
@@ -15,6 +15,7 @@
1515
from sentry_sdk.utils import (
1616
capture_internal_exceptions,
1717
event_from_exception,
18+
ensure_integration_enabled,
1819
logger,
1920
match_regex_list,
2021
reraise,
@@ -147,17 +148,13 @@ def __exit__(self, exc_type, exc_value, traceback):
147148
def _wrap_apply_async(f):
148149
# type: (F) -> F
149150
@wraps(f)
151+
@ensure_integration_enabled(CeleryIntegration, f)
150152
def apply_async(*args, **kwargs):
151153
# type: (*Any, **Any) -> Any
152-
hub = Hub.current
153-
integration = hub.get_integration(CeleryIntegration)
154-
155-
if integration is None:
156-
return f(*args, **kwargs)
157-
158154
# Note: kwargs can contain headers=None, so no setdefault!
159155
# Unsure which backend though.
160156
kwarg_headers = kwargs.get("headers") or {}
157+
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
161158
propagate_traces = kwarg_headers.pop(
162159
"sentry-propagate-traces", integration.propagate_traces
163160
)
@@ -173,15 +170,15 @@ def apply_async(*args, **kwargs):
173170
task = args[0]
174171

175172
span_mgr = (
176-
hub.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task.name)
173+
sentry_sdk.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task.name)
177174
if not task_started_from_beat
178175
else NoOpMgr()
179176
) # type: Union[Span, NoOpMgr]
180177

181178
with span_mgr as span:
182179
with capture_internal_exceptions():
183180
headers = (
184-
dict(hub.iter_trace_propagation_headers(span))
181+
dict(Scope.get_current_scope().iter_trace_propagation_headers(span))
185182
if span is not None
186183
else {}
187184
)
@@ -240,12 +237,9 @@ def _wrap_tracer(task, f):
240237
# Also because in Celery 3, signal dispatch returns early if one handler
241238
# crashes.
242239
@wraps(f)
240+
@ensure_integration_enabled(CeleryIntegration, f)
243241
def _inner(*args, **kwargs):
244242
# type: (*Any, **Any) -> Any
245-
hub = Hub.current
246-
if hub.get_integration(CeleryIntegration) is None:
247-
return f(*args, **kwargs)
248-
249243
with isolation_scope() as scope:
250244
scope._name = "celery"
251245
scope.clear_breadcrumbs()
@@ -268,7 +262,7 @@ def _inner(*args, **kwargs):
268262
if transaction is None:
269263
return f(*args, **kwargs)
270264

271-
with hub.start_transaction(
265+
with sentry_sdk.start_transaction(
272266
transaction,
273267
custom_sampling_context={
274268
"celery_job": {
@@ -339,34 +333,31 @@ def event_processor(event, hint):
339333

340334
def _capture_exception(task, exc_info):
341335
# type: (Any, ExcInfo) -> None
342-
hub = Hub.current
343-
344-
if hub.get_integration(CeleryIntegration) is None:
336+
client = sentry_sdk.get_client()
337+
if client.get_integration(CeleryIntegration) is None:
345338
return
339+
346340
if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
347341
# ??? Doesn't map to anything
348-
_set_status(hub, "aborted")
342+
_set_status("aborted")
349343
return
350344

351-
_set_status(hub, "internal_error")
345+
_set_status("internal_error")
352346

353347
if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
354348
return
355349

356-
# If an integration is there, a client has to be there.
357-
client = hub.client # type: Any
358-
359350
event, hint = event_from_exception(
360351
exc_info,
361352
client_options=client.options,
362353
mechanism={"type": "celery", "handled": False},
363354
)
364355

365-
hub.capture_event(event, hint=hint)
356+
sentry_sdk.capture_event(event, hint=hint)
366357

367358

368-
def _set_status(hub, status):
369-
# type: (Hub, str) -> None
359+
def _set_status(status):
360+
# type: (str) -> None
370361
with capture_internal_exceptions():
371362
scope = Scope.get_current_scope()
372363
if scope.span is not None:
@@ -388,9 +379,11 @@ def sentry_workloop(*args, **kwargs):
388379
return old_workloop(*args, **kwargs)
389380
finally:
390381
with capture_internal_exceptions():
391-
hub = Hub.current
392-
if hub.get_integration(CeleryIntegration) is not None:
393-
hub.flush()
382+
if (
383+
sentry_sdk.get_client().get_integration(CeleryIntegration)
384+
is not None
385+
):
386+
sentry_sdk.flush()
394387

395388
Worker.workloop = sentry_workloop
396389

@@ -487,6 +480,7 @@ def _patch_beat_apply_entry():
487480
# type: () -> None
488481
original_apply_entry = Scheduler.apply_entry
489482

483+
@ensure_integration_enabled(CeleryIntegration, original_apply_entry)
490484
def sentry_apply_entry(*args, **kwargs):
491485
# type: (*Any, **Any) -> None
492486
scheduler, schedule_entry = args
@@ -495,42 +489,38 @@ def sentry_apply_entry(*args, **kwargs):
495489
celery_schedule = schedule_entry.schedule
496490
monitor_name = schedule_entry.name
497491

498-
hub = Hub.current
499-
integration = hub.get_integration(CeleryIntegration)
500-
if integration is None:
501-
return original_apply_entry(*args, **kwargs)
502-
492+
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
503493
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
504494
return original_apply_entry(*args, **kwargs)
505495

506-
with hub.configure_scope() as scope:
507-
# When tasks are started from Celery Beat, make sure each task has its own trace.
508-
scope.set_new_propagation_context()
496+
# When tasks are started from Celery Beat, make sure each task has its own trace.
497+
scope = Scope.get_isolation_scope()
498+
scope.set_new_propagation_context()
509499

510-
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
500+
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
511501

512-
is_supported_schedule = bool(monitor_config)
513-
if is_supported_schedule:
514-
headers = schedule_entry.options.pop("headers", {})
515-
headers.update(
516-
{
517-
"sentry-monitor-slug": monitor_name,
518-
"sentry-monitor-config": monitor_config,
519-
}
520-
)
502+
is_supported_schedule = bool(monitor_config)
503+
if is_supported_schedule:
504+
headers = schedule_entry.options.pop("headers", {})
505+
headers.update(
506+
{
507+
"sentry-monitor-slug": monitor_name,
508+
"sentry-monitor-config": monitor_config,
509+
}
510+
)
521511

522-
check_in_id = capture_checkin(
523-
monitor_slug=monitor_name,
524-
monitor_config=monitor_config,
525-
status=MonitorStatus.IN_PROGRESS,
526-
)
527-
headers.update({"sentry-monitor-check-in-id": check_in_id})
512+
check_in_id = capture_checkin(
513+
monitor_slug=monitor_name,
514+
monitor_config=monitor_config,
515+
status=MonitorStatus.IN_PROGRESS,
516+
)
517+
headers.update({"sentry-monitor-check-in-id": check_in_id})
528518

529-
# Set the Sentry configuration in the options of the ScheduleEntry.
530-
# Those will be picked up in `apply_async` and added to the headers.
531-
schedule_entry.options["headers"] = headers
519+
# Set the Sentry configuration in the options of the ScheduleEntry.
520+
# Those will be picked up in `apply_async` and added to the headers.
521+
schedule_entry.options["headers"] = headers
532522

533-
return original_apply_entry(*args, **kwargs)
523+
return original_apply_entry(*args, **kwargs)
534524

535525
Scheduler.apply_entry = sentry_apply_entry
536526

tests/integrations/celery/test_celery_beat_crons.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,9 @@ def test_exclude_beat_tasks_option(
407407
fake_integration = MagicMock()
408408
fake_integration.exclude_beat_tasks = exclude_beat_tasks
409409

410+
fake_client = MagicMock()
411+
fake_client.get_integration.return_value = fake_integration
412+
410413
fake_schedule_entry = MagicMock()
411414
fake_schedule_entry.name = task_name
412415

@@ -416,8 +419,8 @@ def test_exclude_beat_tasks_option(
416419
"sentry_sdk.integrations.celery.Scheduler", fake_scheduler
417420
) as Scheduler: # noqa: N806
418421
with mock.patch(
419-
"sentry_sdk.integrations.celery.Hub.current.get_integration",
420-
return_value=fake_integration,
422+
"sentry_sdk.integrations.celery.sentry_sdk.get_client",
423+
return_value=fake_client,
421424
):
422425
with mock.patch(
423426
"sentry_sdk.integrations.celery._get_monitor_config",

0 commit comments

Comments
 (0)