1111package de .rub .nds .tlsattacker .core .workflow ;
1212
1313import de .rub .nds .tlsattacker .core .state .State ;
14- import de .rub .nds .tlsattacker .core .workflow .task .ITask ;
1514import de .rub .nds .tlsattacker .core .workflow .task .StateExecutionServerTask ;
1615import de .rub .nds .tlsattacker .core .workflow .task .StateExecutionTask ;
1716import de .rub .nds .tlsattacker .core .workflow .task .TlsTask ;
@@ -35,7 +34,6 @@ public class ParallelExecutor {
3534 private static final Logger LOGGER = LogManager .getLogger ();
3635
3736 private final ThreadPoolExecutor executorService ;
38- private final CompletionService <ITask > completionService ;
3937 private Callable <Integer > timeoutAction ;
4038
4139 private final int size ;
@@ -45,7 +43,6 @@ public class ParallelExecutor {
4543
4644 public ParallelExecutor (int size , int reexecutions ) {
4745 executorService = new ThreadPoolExecutor (size , size , 10 , TimeUnit .DAYS , new LinkedBlockingDeque <Runnable >());
48- completionService = new ExecutorCompletionService <>(executorService );
4946 this .reexecutions = reexecutions ;
5047 this .size = size ;
5148 if (reexecutions < 0 ) {
@@ -56,27 +53,26 @@ public ParallelExecutor(int size, int reexecutions) {
5653 public ParallelExecutor (int size , int reexecutions , ThreadFactory factory ) {
5754 executorService =
5855 new ThreadPoolExecutor (size , size , 5 , TimeUnit .MINUTES , new LinkedBlockingDeque <Runnable >(), factory );
59- completionService = new ExecutorCompletionService <>(executorService );
6056 this .reexecutions = reexecutions ;
6157 this .size = size ;
6258 if (reexecutions < 0 ) {
6359 throw new IllegalArgumentException ("Reexecutions is below zero" );
6460 }
6561 }
6662
67- public Future addTask (TlsTask task ) {
63+ private Future addTask (TlsTask task ) {
6864 if (executorService .isShutdown ()) {
6965 throw new RuntimeException ("Cannot add Tasks to already shutdown executor" );
7066 }
71- Future <?> submit = completionService .submit (task );
67+ Future <?> submit = executorService .submit (task );
7268 return submit ;
7369 }
7470
75- public Future addClientStateTask (State state ) {
71+ private Future addClientStateTask (State state ) {
7672 return addTask (new StateExecutionTask (state , reexecutions ));
7773 }
7874
79- public Future addServerStateTask (State state , ServerSocket socket ) {
75+ private Future addServerStateTask (State state , ServerSocket socket ) {
8076 return addTask (new StateExecutionServerTask (state , socket , reexecutions ));
8177 }
8278
@@ -126,41 +122,46 @@ public void shutdown() {
126122 }
127123
128124 /**
129- * Creates a new thread monitoring the completionService. If the last {@link TlsTask} was finished more than 20
130- * seconds ago, the function assiged to {@link ParallelExecutor#timeoutAction } is executed.
125+ * Creates a new thread monitoring the executorService. If the last {@link TlsTask} was finished more than
126+ *
127+ * @param timeout
128+ * milliseconds ago, the function assiged to {@link ParallelExecutor#timeoutAction } is executed.
131129 *
132130 * The {@link ParallelExecutor#timeoutAction } function can, for example, try to restart the client/server, so that
133131 * the remaining {@link TlsTask}s can be finished.
134132 */
135- public void armTimeoutAction () {
133+ public void armTimeoutAction (int timeout ) {
136134 if (timeoutAction == null ) {
137135 LOGGER .warn ("No TimeoutAction set, this won't do anything" );
138136 return ;
139137 }
140138
141139 new Thread (() -> {
142- while (!shouldShutdown ) {
143- try {
144- Future <ITask > task = completionService .poll (20 , TimeUnit .SECONDS );
145- if (task != null ) {
146- continue ;
147- }
140+ monitorExecution (timeout );
141+ }).start ();
142+ }
148143
149- LOGGER .debug ("Timeout" );
150- if (executorService .getQueue ().size () > 0 ) {
151- try {
152- int exitCode = timeoutAction .call ();
153- if (exitCode != 0 ) {
154- throw new RuntimeException ("TimeoutAction did terminate with code " + exitCode );
155- }
156- } catch (Exception e ) {
157- LOGGER .warn ("TimeoutAction did not succeed" , e );
158- }
144+ private void monitorExecution (int timeout ) {
145+ long timeoutTime = System .currentTimeMillis () + timeout ;
146+ long lastCompletedCount = 0 ;
147+ while (!shouldShutdown ) {
148+ long completedCount = executorService .getCompletedTaskCount ();
149+ if (executorService .getActiveCount () == 0 || completedCount != lastCompletedCount ) {
150+ timeoutTime = System .currentTimeMillis () + timeout ;
151+ lastCompletedCount = completedCount ;
152+ } else if (System .currentTimeMillis () > timeoutTime ) {
153+ LOGGER .debug ("Timeout" );
154+ try {
155+ int exitCode = timeoutAction .call ();
156+ if (exitCode != 0 ) {
157+ throw new RuntimeException ("TimeoutAction did terminate with code " + exitCode );
159158 }
160- } catch (InterruptedException ignored ) {
159+ timeoutTime = System .currentTimeMillis () + timeout ;
160+ } catch (Exception e ) {
161+ LOGGER .warn ("TimeoutAction did not succeed" , e );
161162 }
162163 }
163- }). start ();
164+ }
164165 }
165166
166167 public int getReexecutions () {
0 commit comments