112112try :
113113 from gevent import get_hub as get_gevent_hub # type: ignore
114114 from gevent .monkey import get_original , is_module_patched # type: ignore
115+ from gevent .threadpool import ThreadPool # type: ignore
115116
116117 thread_sleep = get_original ("time" , "sleep" )
117118except ImportError :
@@ -127,6 +128,8 @@ def is_module_patched(*args, **kwargs):
127128 # unable to import from gevent means no modules have been patched
128129 return False
129130
131+ ThreadPool = None
132+
130133
131134def is_gevent ():
132135 # type: () -> bool
@@ -177,10 +180,7 @@ def setup_profiler(options):
177180 ):
178181 _scheduler = ThreadScheduler (frequency = frequency )
179182 elif profiler_mode == GeventScheduler .mode :
180- try :
181- _scheduler = GeventScheduler (frequency = frequency )
182- except ImportError :
183- raise ValueError ("Profiler mode: {} is not available" .format (profiler_mode ))
183+ _scheduler = GeventScheduler (frequency = frequency )
184184 else :
185185 raise ValueError ("Unknown profiler mode: {}" .format (profiler_mode ))
186186
@@ -703,7 +703,8 @@ def __init__(self, frequency):
703703
704704 self .sampler = self .make_sampler ()
705705
706- self .new_profiles = deque () # type: Deque[Profile]
706+ # cap the number of new profiles at any time so it does not grow infinitely
707+ self .new_profiles = deque (maxlen = 128 ) # type: Deque[Profile]
707708 self .active_profiles = set () # type: Set[Profile]
708709
709710 def __enter__ (self ):
@@ -723,8 +724,13 @@ def teardown(self):
723724 # type: () -> None
724725 raise NotImplementedError
725726
727+ def ensure_running (self ):
728+ # type: () -> None
729+ raise NotImplementedError
730+
726731 def start_profiling (self , profile ):
727732 # type: (Profile) -> None
733+ self .ensure_running ()
728734 self .new_profiles .append (profile )
729735
730736 def stop_profiling (self , profile ):
@@ -827,21 +833,44 @@ def __init__(self, frequency):
827833
828834 # used to signal to the thread that it should stop
829835 self .running = False
830-
831- # make sure the thread is a daemon here otherwise this
832- # can keep the application running after other threads
833- # have exited
834- self .thread = threading .Thread (name = self .name , target = self .run , daemon = True )
836+ self .thread = None # type: Optional[threading.Thread]
837+ self .pid = None # type: Optional[int]
838+ self .lock = threading .Lock ()
835839
836840 def setup (self ):
837841 # type: () -> None
838- self .running = True
839- self .thread .start ()
842+ pass
840843
841844 def teardown (self ):
842845 # type: () -> None
843- self .running = False
844- self .thread .join ()
846+ if self .running :
847+ self .running = False
848+ if self .thread is not None :
849+ self .thread .join ()
850+
851+ def ensure_running (self ):
852+ # type: () -> None
853+ pid = os .getpid ()
854+
855+ # is running on the right process
856+ if self .running and self .pid == pid :
857+ return
858+
859+ with self .lock :
860+ # another thread may have tried to acquire the lock
861+ # at the same time so it may start another thread
862+ # make sure to check again before proceeding
863+ if self .running and self .pid == pid :
864+ return
865+
866+ self .pid = pid
867+ self .running = True
868+
869+ # make sure the thread is a daemon here otherwise this
870+ # can keep the application running after other threads
871+ # have exited
872+ self .thread = threading .Thread (name = self .name , target = self .run , daemon = True )
873+ self .thread .start ()
845874
846875 def run (self ):
847876 # type: () -> None
@@ -882,28 +911,52 @@ class GeventScheduler(Scheduler):
882911 def __init__ (self , frequency ):
883912 # type: (int) -> None
884913
885- # This can throw an ImportError that must be caught if `gevent` is
886- # not installed.
887- from gevent .threadpool import ThreadPool # type: ignore
914+ if ThreadPool is None :
915+ raise ValueError ("Profiler mode: {} is not available" .format (self .mode ))
888916
889917 super (GeventScheduler , self ).__init__ (frequency = frequency )
890918
891919 # used to signal to the thread that it should stop
892920 self .running = False
921+ self .thread = None # type: Optional[ThreadPool]
922+ self .pid = None # type: Optional[int]
893923
894- # Using gevent's ThreadPool allows us to bypass greenlets and spawn
895- # native threads.
896- self .pool = ThreadPool (1 )
924+ # This intentionally uses the gevent patched threading.Lock.
925+ # The lock will be required when first trying to start profiles
926+ # as we need to spawn the profiler thread from the greenlets.
927+ self .lock = threading .Lock ()
897928
898929 def setup (self ):
899930 # type: () -> None
900- self .running = True
901- self .pool .spawn (self .run )
931+ pass
902932
903933 def teardown (self ):
904934 # type: () -> None
905- self .running = False
906- self .pool .join ()
935+ if self .running :
936+ self .running = False
937+ if self .thread is not None :
938+ self .thread .join ()
939+
940+ def ensure_running (self ):
941+ # type: () -> None
942+ pid = os .getpid ()
943+
944+ # is running on the right process
945+ if self .running and self .pid == pid :
946+ return
947+
948+ with self .lock :
949+ # another thread may have tried to acquire the lock
950+ # at the same time so it may start another thread
951+ # make sure to check again before proceeding
952+ if self .running and self .pid == pid :
953+ return
954+
955+ self .pid = pid
956+ self .running = True
957+
958+ self .thread = ThreadPool (1 )
959+ self .thread .spawn (self .run )
907960
908961 def run (self ):
909962 # type: () -> None
0 commit comments