11from __future__ import absolute_import
22
33import sys
4- import shutil
5- import functools
6- import tempfile
74
85from sentry_sdk .consts import OP
96from sentry_sdk ._compat import reraise
2522 from typing import Any
2623 from typing import Callable
2724 from typing import Dict
28- from typing import List
2925 from typing import Optional
3026 from typing import Tuple
3127 from typing import TypeVar
4036 from celery import VERSION as CELERY_VERSION
4137 from celery import Task , Celery
4238 from celery .app .trace import task_has_custom
43- from celery .beat import Service # type: ignore
39+ from celery .beat import Scheduler # type: ignore
4440 from celery .exceptions import ( # type: ignore
4541 Ignore ,
4642 Reject ,
4945 )
5046 from celery .schedules import crontab , schedule # type: ignore
5147 from celery .signals import ( # type: ignore
52- beat_init ,
53- task_prerun ,
5448 task_failure ,
5549 task_success ,
5650 task_retry ,
@@ -68,9 +62,11 @@ class CeleryIntegration(Integration):
6862 def __init__ (self , propagate_traces = True , monitor_beat_tasks = False ):
6963 # type: (bool, bool) -> None
7064 self .propagate_traces = propagate_traces
65+ self .monitor_beat_tasks = monitor_beat_tasks
7166
7267 if monitor_beat_tasks :
73- _patch_celery_beat_tasks ()
68+ _patch_beat_apply_entry ()
69+ _setup_celery_beat_signals ()
7470
7571 @staticmethod
7672 def setup_once ():
@@ -131,6 +127,12 @@ def apply_async(*args, **kwargs):
131127 ) as span :
132128 with capture_internal_exceptions ():
133129 headers = dict (hub .iter_trace_propagation_headers (span ))
130+ if integration .monitor_beat_tasks :
131+ headers .update (
132+ {
133+ "sentry-monitor-start-timestamp-s" : "%.9f" % now (),
134+ }
135+ )
134136
135137 if headers :
136138 # Note: kwargs can contain headers=None, so no setdefault!
@@ -320,12 +322,15 @@ def sentry_workloop(*args, **kwargs):
320322
321323def _get_headers (task ):
322324 # type: (Task) -> Dict[str, Any]
323- headers = task .request .get ("headers" ) or {}
325+ headers = task .request .get ("headers" , {})
324326
327+ # flatten nested headers
325328 if "headers" in headers :
326329 headers .update (headers ["headers" ])
327330 del headers ["headers" ]
328331
332+ headers .update (task .request .get ("properties" , {}))
333+
329334 return headers
330335
331336
@@ -387,123 +392,47 @@ def _get_monitor_config(celery_schedule, app):
387392 return monitor_config
388393
389394
390- def _reinstall_patched_tasks (app , sender , add_updated_periodic_tasks ):
391- # type: (Celery, Service, List[functools.partial[Any]]) -> None
392-
393- # Stop Celery Beat
394- sender .stop ()
395-
396- # Update tasks to include Monitor information in headers
397- for add_updated_periodic_task in add_updated_periodic_tasks :
398- add_updated_periodic_task ()
399-
400- # Start Celery Beat (with new (cloned) schedule, because old one is still in use)
401- cloned_schedule = tempfile .NamedTemporaryFile (suffix = "-patched-by-sentry-sdk" )
402- with open (sender .schedule_filename , "rb" ) as original_schedule :
403- shutil .copyfileobj (original_schedule , cloned_schedule )
395+ def _patch_beat_apply_entry ():
396+ # type: () -> None
397+ original_apply_entry = Scheduler .apply_entry
398+
399+ def sentry_apply_entry (* args , ** kwargs ):
400+ # type: (*Any, **Any) -> None
401+ scheduler , schedule_entry = args
402+ app = scheduler .app
403+
404+ celery_schedule = schedule_entry .schedule
405+ monitor_config = _get_monitor_config (celery_schedule , app )
406+ monitor_name = schedule_entry .name
407+
408+ headers = schedule_entry .options .pop ("headers" , {})
409+ headers .update (
410+ {
411+ "sentry-monitor-slug" : monitor_name ,
412+ "sentry-monitor-config" : monitor_config ,
413+ }
414+ )
404415
405- app .Beat (schedule = cloned_schedule .name ).run ()
416+ check_in_id = capture_checkin (
417+ monitor_slug = monitor_name ,
418+ monitor_config = monitor_config ,
419+ status = MonitorStatus .IN_PROGRESS ,
420+ )
421+ headers .update ({"sentry-monitor-check-in-id" : check_in_id })
406422
423+ schedule_entry .options .update (headers )
424+ return original_apply_entry (* args , ** kwargs )
407425
408- # Nested functions do not work as Celery hook receiver,
409- # so defining it here explicitly
410- celery_beat_init = None
426+ Scheduler .apply_entry = sentry_apply_entry
411427
412428
413- def _patch_celery_beat_tasks ():
429+ def _setup_celery_beat_signals ():
414430 # type: () -> None
415-
416- global celery_beat_init
417-
418- def celery_beat_init (sender , ** kwargs ):
419- # type: (Service, Dict[Any, Any]) -> None
420-
421- # Because we restart Celery Beat,
422- # make sure that this will not be called infinitely
423- beat_init .disconnect (celery_beat_init )
424-
425- app = sender .app
426-
427- add_updated_periodic_tasks = []
428-
429- for name in sender .scheduler .schedule .keys ():
430- # Ignore Celery's internal tasks
431- if name .startswith ("celery." ):
432- continue
433-
434- monitor_name = name
435-
436- schedule_entry = sender .scheduler .schedule [name ]
437- celery_schedule = schedule_entry .schedule
438- monitor_config = _get_monitor_config (celery_schedule , app )
439-
440- if monitor_config is None :
441- continue
442-
443- headers = schedule_entry .options .pop ("headers" , {})
444- headers .update (
445- {
446- "headers" : {
447- "sentry-monitor-slug" : monitor_name ,
448- "sentry-monitor-config" : monitor_config ,
449- },
450- }
451- )
452-
453- task_signature = app .tasks .get (schedule_entry .task ).s ()
454- task_signature .set (headers = headers )
455-
456- logger .debug (
457- "Set up Sentry Celery Beat monitoring for %s (%s)" ,
458- task_signature ,
459- monitor_name ,
460- )
461-
462- add_updated_periodic_tasks .append (
463- functools .partial (
464- app .add_periodic_task ,
465- celery_schedule ,
466- task_signature ,
467- args = schedule_entry .args ,
468- kwargs = schedule_entry .kwargs ,
469- name = schedule_entry .name ,
470- ** (schedule_entry .options or {})
471- )
472- )
473-
474- _reinstall_patched_tasks (app , sender , add_updated_periodic_tasks )
475-
476- beat_init .connect (celery_beat_init )
477- task_prerun .connect (crons_task_before_run )
478431 task_success .connect (crons_task_success )
479432 task_failure .connect (crons_task_failure )
480433 task_retry .connect (crons_task_retry )
481434
482435
483- def crons_task_before_run (sender , ** kwargs ):
484- # type: (Task, Dict[Any, Any]) -> None
485- logger .debug ("celery_task_before_run %s" , sender )
486- headers = _get_headers (sender )
487-
488- if "sentry-monitor-slug" not in headers :
489- return
490-
491- monitor_config = headers .get ("sentry-monitor-config" , {})
492-
493- start_timestamp_s = now ()
494-
495- check_in_id = capture_checkin (
496- monitor_slug = headers ["sentry-monitor-slug" ],
497- monitor_config = monitor_config ,
498- status = MonitorStatus .IN_PROGRESS ,
499- )
500-
501- headers .update ({"sentry-monitor-check-in-id" : check_in_id })
502- headers .update ({"sentry-monitor-start-timestamp-s" : start_timestamp_s })
503-
504- sender .s ().set (headers = headers )
505-
506-
507436def crons_task_success (sender , ** kwargs ):
508437 # type: (Task, Dict[Any, Any]) -> None
509438 logger .debug ("celery_task_success %s" , sender )
@@ -514,7 +443,7 @@ def crons_task_success(sender, **kwargs):
514443
515444 monitor_config = headers .get ("sentry-monitor-config" , {})
516445
517- start_timestamp_s = headers ["sentry-monitor-start-timestamp-s" ]
446+ start_timestamp_s = float ( headers ["sentry-monitor-start-timestamp-s" ])
518447
519448 capture_checkin (
520449 monitor_slug = headers ["sentry-monitor-slug" ],
@@ -535,7 +464,7 @@ def crons_task_failure(sender, **kwargs):
535464
536465 monitor_config = headers .get ("sentry-monitor-config" , {})
537466
538- start_timestamp_s = headers ["sentry-monitor-start-timestamp-s" ]
467+ start_timestamp_s = float ( headers ["sentry-monitor-start-timestamp-s" ])
539468
540469 capture_checkin (
541470 monitor_slug = headers ["sentry-monitor-slug" ],
@@ -556,7 +485,7 @@ def crons_task_retry(sender, **kwargs):
556485
557486 monitor_config = headers .get ("sentry-monitor-config" , {})
558487
559- start_timestamp_s = headers ["sentry-monitor-start-timestamp-s" ]
488+ start_timestamp_s = float ( headers ["sentry-monitor-start-timestamp-s" ])
560489
561490 capture_checkin (
562491 monitor_slug = headers ["sentry-monitor-slug" ],
0 commit comments