77
88from nettacker import logger
99from nettacker .config import Config
10+ from nettacker .core import queue_manager
1011from nettacker .core .messages import messages as _
1112from nettacker .core .template import TemplateLoader
1213from nettacker .core .utils .common import expand_module_steps , wait_for_threads_to_finish
@@ -118,26 +119,44 @@ def generate_loops(self):
118119 self .module_content ["payloads" ] = expand_module_steps (self .module_content ["payloads" ])
119120
120121 def sort_loops (self ):
121- steps = []
122+ """
123+ Sort loops to optimize dependency resolution:
124+ 1. Independent steps first
125+ 2. Steps that generate dependencies (save_to_temp_events_only)
126+ 3. Steps that consume dependencies (dependent_on_temp_event)
127+ """
122128 for index in range (len (self .module_content ["payloads" ])):
123- for step in copy . deepcopy ( self . module_content [ "payloads" ][ index ][ "steps" ]):
124- if "dependent_on_temp_event" not in step [ 0 ][ "response" ]:
125- steps . append ( step )
129+ independent_steps = []
130+ dependency_generators = []
131+ dependency_consumers = []
126132
127133 for step in copy .deepcopy (self .module_content ["payloads" ][index ]["steps" ]):
128- if (
129- "dependent_on_temp_event" in step [0 ]["response" ]
130- and "save_to_temp_events_only" in step [0 ]["response" ]
131- ):
132- steps .append (step )
134+ step_response = step [0 ]["response" ] if step and len (step ) > 0 else {}
135+
136+ has_dependency = "dependent_on_temp_event" in step_response
137+ generates_dependency = "save_to_temp_events_only" in step_response
138+
139+ if not has_dependency and not generates_dependency :
140+ independent_steps .append (step )
141+ elif generates_dependency and not has_dependency :
142+ dependency_generators .append (step )
143+ elif generates_dependency and has_dependency :
144+ dependency_generators .append (step ) # Generator first
145+ elif has_dependency and not generates_dependency :
146+ dependency_consumers .append (step )
147+ else :
148+ independent_steps .append (step ) # Fallback
133149
134- for step in copy .deepcopy (self .module_content ["payloads" ][index ]["steps" ]):
135- if (
136- "dependent_on_temp_event" in step [0 ]["response" ]
137- and "save_to_temp_events_only" not in step [0 ]["response" ]
138- ):
139- steps .append (step )
140- self .module_content ["payloads" ][index ]["steps" ] = steps
150+ # Combine in optimal order
151+ sorted_steps = independent_steps + dependency_generators + dependency_consumers
152+ self .module_content ["payloads" ][index ]["steps" ] = sorted_steps
153+
154+ log .verbose_info (
155+ f"Sorted { len (sorted_steps )} steps: "
156+ f"{ len (independent_steps )} independent, "
157+ f"{ len (dependency_generators )} generators, "
158+ f"{ len (dependency_consumers )} consumers"
159+ )
141160
142161 def start (self ):
143162 active_threads = []
@@ -158,11 +177,16 @@ def start(self):
158177 importlib .import_module (f"nettacker.core.lib.{ library .lower ()} " ),
159178 f"{ library .capitalize ()} Engine" ,
160179 )()
180+
161181 for step in payload ["steps" ]:
162182 for sub_step in step :
163- thread = Thread (
164- target = engine .run ,
165- args = (
183+ # Try to use shared thread pool if available, otherwise use local threads
184+ if queue_manager .thread_pool and hasattr (
185+ queue_manager .thread_pool , "submit_task"
186+ ):
187+ # Submit to shared thread pool
188+ queue_manager .thread_pool .submit_task (
189+ engine .run ,
166190 sub_step ,
167191 self .module_name ,
168192 self .target ,
@@ -173,9 +197,35 @@ def start(self):
173197 self .total_module_thread_number ,
174198 request_number_counter ,
175199 total_number_of_requests ,
176- ),
177- )
178- thread .name = f"{ self .target } -> { self .module_name } -> { sub_step } "
200+ )
201+ else :
202+ # Use local thread (fallback to original behavior)
203+ thread = Thread (
204+ target = engine .run ,
205+ args = (
206+ sub_step ,
207+ self .module_name ,
208+ self .target ,
209+ self .scan_id ,
210+ self .module_inputs ,
211+ self .process_number ,
212+ self .module_thread_number ,
213+ self .total_module_thread_number ,
214+ request_number_counter ,
215+ total_number_of_requests ,
216+ ),
217+ )
218+ thread .name = f"{ self .target } -> { self .module_name } -> { sub_step } "
219+ thread .start ()
220+ active_threads .append (thread )
221+
222+ # Manage local thread pool size
223+ wait_for_threads_to_finish (
224+ active_threads ,
225+ maximum = self .module_inputs ["thread_per_host" ],
226+ terminable = True ,
227+ )
228+
179229 request_number_counter += 1
180230 log .verbose_event_info (
181231 _ ("sending_module_request" ).format (
@@ -188,13 +238,8 @@ def start(self):
188238 total_number_of_requests ,
189239 )
190240 )
191- thread .start ()
192241 time .sleep (self .module_inputs ["time_sleep_between_requests" ])
193- active_threads .append (thread )
194- wait_for_threads_to_finish (
195- active_threads ,
196- maximum = self .module_inputs ["thread_per_host" ],
197- terminable = True ,
198- )
199242
200- wait_for_threads_to_finish (active_threads , maximum = None , terminable = True )
243+ # Wait for any remaining local threads to finish
244+ if active_threads :
245+ wait_for_threads_to_finish (active_threads , maximum = None , terminable = True )
0 commit comments