@@ -111,6 +111,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
111111
112112 size_t si = 0 ;
113113 bool missingSporadic = false ;
114+ bool needsProcessing = false ;
114115 size_t currentTimeslice = -1 ;
115116 for (auto & input : inputs) {
116117 assert (si < specs.size ());
@@ -127,6 +128,9 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
127128 currentTimeslice = dph->startTime ;
128129 }
129130 }
131+ if (input.header != nullptr && spec.lifetime != Lifetime::Condition) {
132+ needsProcessing = true ;
133+ }
130134 }
131135 // If some sporadic inputs are missing, we wait for them util we are sure they will not come,
132136 // i.e. until the oldest possible timeslice is beyond the timeslice of the input.
@@ -136,7 +140,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
136140 if (missingSporadic && currentTimeslice >= oldestPossibleTimeslice) {
137141 return CompletionPolicy::CompletionOp::Retry;
138142 }
139- return CompletionPolicy::CompletionOp::Consume;
143+ // We only consume if we have something which needs processing.
144+ return needsProcessing ? CompletionPolicy::CompletionOp::Consume : CompletionPolicy::CompletionOp::Discard;
140145 };
141146 return CompletionPolicy{name, matcher, callback};
142147}
0 commit comments