|
1 | 1 | import sys |
| 2 | +from collections.abc import Mapping |
2 | 3 | from functools import wraps |
3 | 4 |
|
4 | 5 | import sentry_sdk |
|
47 | 48 | Retry, |
48 | 49 | SoftTimeLimitExceeded, |
49 | 50 | ) |
| 51 | + from kombu import Producer # type: ignore |
50 | 52 | except ImportError: |
51 | 53 | raise DidNotEnable("Celery not installed") |
52 | 54 |
|
@@ -82,6 +84,7 @@ def setup_once(): |
82 | 84 | _patch_build_tracer() |
83 | 85 | _patch_task_apply_async() |
84 | 86 | _patch_worker_exit() |
| 87 | + _patch_producer_publish() |
85 | 88 |
|
86 | 89 | # This logger logs every status of every task that ran on the worker. |
87 | 90 | # Meaning that every task's breadcrumbs are full of stuff like "Task |
@@ -433,3 +436,44 @@ def sentry_workloop(*args, **kwargs): |
433 | 436 | sentry_sdk.flush() |
434 | 437 |
|
435 | 438 | Worker.workloop = sentry_workloop |
| 439 | + |
| 440 | + |
| 441 | +def _patch_producer_publish(): |
| 442 | + # type: () -> None |
| 443 | + original_publish = Producer.publish |
| 444 | + |
| 445 | + @ensure_integration_enabled(CeleryIntegration, original_publish) |
| 446 | + def sentry_publish(self, *args, **kwargs): |
| 447 | + # type: (Producer, *Any, **Any) -> Any |
| 448 | + kwargs_headers = kwargs.get("headers", {}) |
| 449 | + if not isinstance(kwargs_headers, Mapping): |
| 450 | + # Ensure kwargs_headers is a Mapping, so we can safely call get() |
| 451 | + kwargs_headers = {} |
| 452 | + |
| 453 | + task_name = kwargs_headers.get("task") |
| 454 | + task_id = kwargs_headers.get("id") |
| 455 | + retries = kwargs_headers.get("retries") |
| 456 | + |
| 457 | + routing_key = kwargs.get("routing_key") |
| 458 | + exchange = kwargs.get("exchange") |
| 459 | + |
| 460 | + with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span: |
| 461 | + if task_id is not None: |
| 462 | + span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) |
| 463 | + |
| 464 | + if exchange == "" and routing_key is not None: |
| 465 | + # Empty exchange indicates the default exchange, meaning messages are |
| 466 | + # routed to the queue with the same name as the routing key. |
| 467 | + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) |
| 468 | + |
| 469 | + if retries is not None: |
| 470 | + span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) |
| 471 | + |
| 472 | + with capture_internal_exceptions(): |
| 473 | + span.set_data( |
| 474 | + SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type |
| 475 | + ) |
| 476 | + |
| 477 | + return original_publish(self, *args, **kwargs) |
| 478 | + |
| 479 | + Producer.publish = sentry_publish |
0 commit comments