1010package de .rub .nds .tlsattacker .core .workflow ;
1111
1212import de .rub .nds .tlsattacker .core .state .State ;
13- import de .rub .nds .tlsattacker .core .workflow .task .ITask ;
1413import de .rub .nds .tlsattacker .core .workflow .task .StateExecutionServerTask ;
1514import de .rub .nds .tlsattacker .core .workflow .task .StateExecutionTask ;
1615import de .rub .nds .tlsattacker .core .workflow .task .TlsTask ;
@@ -34,7 +33,6 @@ public class ParallelExecutor {
3433 private static final Logger LOGGER = LogManager .getLogger ();
3534
3635 private final ThreadPoolExecutor executorService ;
37- private final CompletionService <ITask > completionService ;
3836 private Callable <Integer > timeoutAction ;
3937
4038 private final int size ;
@@ -44,7 +42,6 @@ public class ParallelExecutor {
4442
4543 public ParallelExecutor (int size , int reexecutions ) {
4644 executorService = new ThreadPoolExecutor (size , size , 10 , TimeUnit .DAYS , new LinkedBlockingDeque <Runnable >());
47- completionService = new ExecutorCompletionService <>(executorService );
4845 this .reexecutions = reexecutions ;
4946 this .size = size ;
5047 if (reexecutions < 0 ) {
@@ -55,27 +52,26 @@ public ParallelExecutor(int size, int reexecutions) {
5552 public ParallelExecutor (int size , int reexecutions , ThreadFactory factory ) {
5653 executorService =
5754 new ThreadPoolExecutor (size , size , 5 , TimeUnit .MINUTES , new LinkedBlockingDeque <Runnable >(), factory );
58- completionService = new ExecutorCompletionService <>(executorService );
5955 this .reexecutions = reexecutions ;
6056 this .size = size ;
6157 if (reexecutions < 0 ) {
6258 throw new IllegalArgumentException ("Reexecutions is below zero" );
6359 }
6460 }
6561
66- public Future addTask (TlsTask task ) {
62+ private Future addTask (TlsTask task ) {
6763 if (executorService .isShutdown ()) {
6864 throw new RuntimeException ("Cannot add Tasks to already shutdown executor" );
6965 }
70- Future <?> submit = completionService .submit (task );
66+ Future <?> submit = executorService .submit (task );
7167 return submit ;
7268 }
7369
74- public Future addClientStateTask (State state ) {
70+ private Future addClientStateTask (State state ) {
7571 return addTask (new StateExecutionTask (state , reexecutions ));
7672 }
7773
78- public Future addServerStateTask (State state , ServerSocket socket ) {
74+ private Future addServerStateTask (State state , ServerSocket socket ) {
7975 return addTask (new StateExecutionServerTask (state , socket , reexecutions ));
8076 }
8177
@@ -125,41 +121,46 @@ public void shutdown() {
125121 }
126122
127123 /**
128- * Creates a new thread monitoring the completionService. If the last {@link TlsTask} was finished more than 20
129- * seconds ago, the function assiged to {@link ParallelExecutor#timeoutAction } is executed.
124+ * Creates a new thread monitoring the executorService. If the last {@link TlsTask} was finished more than
125+ *
126+ * @param timeout
127+ * milliseconds ago, the function assiged to {@link ParallelExecutor#timeoutAction } is executed.
130128 *
131129 * The {@link ParallelExecutor#timeoutAction } function can, for example, try to restart the client/server, so that
132130 * the remaining {@link TlsTask}s can be finished.
133131 */
134- public void armTimeoutAction () {
132+ public void armTimeoutAction (int timeout ) {
135133 if (timeoutAction == null ) {
136134 LOGGER .warn ("No TimeoutAction set, this won't do anything" );
137135 return ;
138136 }
139137
140138 new Thread (() -> {
141- while (!shouldShutdown ) {
142- try {
143- Future <ITask > task = completionService .poll (20 , TimeUnit .SECONDS );
144- if (task != null ) {
145- continue ;
146- }
139+ monitorExecution (timeout );
140+ }).start ();
141+ }
147142
148- LOGGER .debug ("Timeout" );
149- if (executorService .getQueue ().size () > 0 ) {
150- try {
151- int exitCode = timeoutAction .call ();
152- if (exitCode != 0 ) {
153- throw new RuntimeException ("TimeoutAction did terminate with code " + exitCode );
154- }
155- } catch (Exception e ) {
156- LOGGER .warn ("TimeoutAction did not succeed" , e );
157- }
143+ private void monitorExecution (int timeout ) {
144+ long timeoutTime = System .currentTimeMillis () + timeout ;
145+ long lastCompletedCount = 0 ;
146+ while (!shouldShutdown ) {
147+ long completedCount = executorService .getCompletedTaskCount ();
148+ if (executorService .getActiveCount () == 0 || completedCount != lastCompletedCount ) {
149+ timeoutTime = System .currentTimeMillis () + timeout ;
150+ lastCompletedCount = completedCount ;
151+ } else if (System .currentTimeMillis () > timeoutTime ) {
152+ LOGGER .debug ("Timeout" );
153+ try {
154+ int exitCode = timeoutAction .call ();
155+ if (exitCode != 0 ) {
156+ throw new RuntimeException ("TimeoutAction did terminate with code " + exitCode );
158157 }
159- } catch (InterruptedException ignored ) {
158+ timeoutTime = System .currentTimeMillis () + timeout ;
159+ } catch (Exception e ) {
160+ LOGGER .warn ("TimeoutAction did not succeed" , e );
160161 }
161162 }
162- }). start ();
163+ }
163164 }
164165
165166 public int getReexecutions () {
0 commit comments