Skip to content

Commit ce23427

Browse files
committed
add some improvement.
1 parent d33aa28 commit ce23427

File tree

2 files changed

+57
-110
lines changed

2 files changed

+57
-110
lines changed

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java

Lines changed: 32 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,8 @@
2727

2828
package org.apache.hc.client5.http.impl.async;
2929

30-
import java.io.IOException;
3130
import java.io.InterruptedIOException;
32-
import java.nio.ByteBuffer;
33-
import java.util.List;
31+
import java.lang.reflect.Proxy;
3432
import java.util.concurrent.ConcurrentHashMap;
3533
import java.util.concurrent.ConcurrentMap;
3634
import java.util.concurrent.RejectedExecutionException;
@@ -51,17 +49,9 @@
5149
import org.apache.hc.core5.concurrent.CallbackContribution;
5250
import org.apache.hc.core5.concurrent.Cancellable;
5351
import org.apache.hc.core5.concurrent.FutureCallback;
54-
import org.apache.hc.core5.http.EntityDetails;
55-
import org.apache.hc.core5.http.Header;
56-
import org.apache.hc.core5.http.HttpException;
57-
import org.apache.hc.core5.http.HttpResponse;
5852
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
5953
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60-
import org.apache.hc.core5.http.nio.CapacityChannel;
61-
import org.apache.hc.core5.http.nio.DataStreamChannel;
6254
import org.apache.hc.core5.http.nio.HandlerFactory;
63-
import org.apache.hc.core5.http.nio.RequestChannel;
64-
import org.apache.hc.core5.http.protocol.HttpContext;
6555
import org.apache.hc.core5.io.CloseMode;
6656
import org.apache.hc.core5.reactor.ConnectionInitiator;
6757
import org.apache.hc.core5.util.TimeValue;
@@ -334,94 +324,40 @@ private void releaseSlot() {
334324
}
335325
}
336326

337-
private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler h) {
338-
return new AsyncClientExchangeHandler() {
339-
private final AtomicBoolean done = new AtomicBoolean(false);
340-
341-
private void finish() {
342-
if (done.compareAndSet(false, true)) {
343-
releaseSlot();
344-
}
345-
}
346-
347-
@Override
348-
public void failed(final Exception cause) {
349-
try {
350-
h.failed(cause);
351-
} finally {
352-
finish();
353-
}
354-
}
355-
356-
@Override
357-
public void cancel() {
358-
try {
359-
h.cancel();
360-
} finally {
361-
finish();
362-
}
363-
}
364-
365-
@Override
366-
public void releaseResources() {
367-
try {
368-
h.releaseResources();
369-
} finally {
370-
finish();
371-
}
372-
}
373-
374-
@Override
375-
public void produceRequest(final RequestChannel channel, final HttpContext ctx) throws HttpException, IOException {
376-
h.produceRequest(channel, ctx);
377-
}
378-
379-
@Override
380-
public int available() {
381-
return h.available();
382-
}
383-
384-
@Override
385-
public void produce(final DataStreamChannel channel) throws IOException {
386-
h.produce(channel);
387-
}
388-
389-
@Override
390-
public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
391-
h.consumeInformation(response, context);
392-
}
393-
394-
@Override
395-
public void consumeResponse(final HttpResponse response, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
396-
h.consumeResponse(response, entity, context);
397-
}
398-
399-
@Override
400-
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
401-
h.updateCapacity(capacityChannel);
402-
}
403-
404-
@Override
405-
public void consume(final ByteBuffer src) throws IOException {
406-
h.consume(src);
407-
}
408-
409-
@Override
410-
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
411-
h.streamEnd(trailers);
412-
}
413-
};
327+
private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handler) {
328+
if (sharedQueued == null) {
329+
return handler;
330+
}
331+
final AtomicBoolean released = new AtomicBoolean(false);
332+
return (AsyncClientExchangeHandler) Proxy.newProxyInstance(
333+
AsyncClientExchangeHandler.class.getClassLoader(),
334+
new Class<?>[]{AsyncClientExchangeHandler.class},
335+
(proxy, method, args) -> {
336+
if ("releaseResources".equals(method.getName())
337+
&& method.getParameterCount() == 0) {
338+
try {
339+
return method.invoke(handler, args);
340+
} finally {
341+
if (released.compareAndSet(false, true)) {
342+
releaseSlot();
343+
}
344+
}
345+
}
346+
return method.invoke(handler, args);
347+
});
414348
}
415349

416350
@Override
417351
public Cancellable execute(
418352
final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
419353
final AsyncConnectionEndpoint endpoint = ensureValid();
420354
if (sharedQueued != null && !tryAcquireSlot()) {
421-
exchangeHandler.failed(new RejectedExecutionException("Execution pipeline queue limit reached (max=" + maxQueued + ")"));
355+
exchangeHandler.failed(new RejectedExecutionException(
356+
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
422357
return Operations.nonCancellable();
423358
}
424-
final AsyncClientExchangeHandler guardedHandler = guard(exchangeHandler);
359+
final AsyncClientExchangeHandler actual =
360+
sharedQueued != null ? guard(exchangeHandler) : exchangeHandler;
425361
if (endpoint.isConnected()) {
426362
if (log.isDebugEnabled()) {
427363
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
@@ -431,10 +367,10 @@ public Cancellable execute(
431367
if (responseTimeout != null) {
432368
endpoint.setSocketTimeout(responseTimeout);
433369
}
434-
endpoint.execute(id, guardedHandler, pushHandlerFactory, context);
370+
endpoint.execute(id, actual, pushHandlerFactory, context);
435371
if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
436372
return () -> {
437-
guardedHandler.cancel();
373+
actual.cancel();
438374
return true;
439375
};
440376
}
@@ -447,20 +383,20 @@ public void completed(final AsyncExecRuntime runtime) {
447383
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
448384
}
449385
try {
450-
endpoint.execute(id, guardedHandler, pushHandlerFactory, context);
386+
endpoint.execute(id, actual, pushHandlerFactory, context);
451387
} catch (final RuntimeException ex) {
452388
failed(ex);
453389
}
454390
}
455391

456392
@Override
457393
public void failed(final Exception ex) {
458-
guardedHandler.failed(ex);
394+
actual.failed(ex);
459395
}
460396

461397
@Override
462398
public void cancelled() {
463-
guardedHandler.failed(new InterruptedIOException());
399+
actual.failed(new InterruptedIOException());
464400
}
465401

466402
});
@@ -483,4 +419,4 @@ public AsyncExecRuntime fork() {
483419
return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, maxQueued);
484420
}
485421

486-
}
422+
}

httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,21 +146,22 @@ void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception {
146146
final HttpClientContext ctx = HttpClientContext.create();
147147
ctx.setRequestConfig(RequestConfig.custom().build());
148148

149-
runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback<AsyncExecRuntime>() {
150-
@Override
151-
public void completed(final AsyncExecRuntime result) {
152-
}
149+
runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
150+
new FutureCallback<AsyncExecRuntime>() {
151+
@Override
152+
public void completed(final AsyncExecRuntime result) {
153+
}
153154

154-
@Override
155-
public void failed(final Exception ex) {
156-
fail(ex);
157-
}
155+
@Override
156+
public void failed(final Exception ex) {
157+
fail(ex);
158+
}
158159

159-
@Override
160-
public void cancelled() {
161-
fail("cancelled");
162-
}
163-
});
160+
@Override
161+
public void cancelled() {
162+
fail("cancelled");
163+
}
164+
});
164165

165166
final LatchingHandler h1 = new LatchingHandler();
166167
runtime.execute("r1", h1, ctx);
@@ -170,7 +171,8 @@ public void cancelled() {
170171
assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS));
171172
assertTrue(h2.failedException.get() instanceof RejectedExecutionException);
172173

173-
endpoint.failOne(new IOException("boom"));
174+
// free the slot via releaseResources(), not failed()
175+
endpoint.completeOne();
174176

175177
final LatchingHandler h3 = new LatchingHandler();
176178
runtime.execute("r3", h3, ctx);
@@ -179,6 +181,7 @@ public void cancelled() {
179181
h3.cancel();
180182
}
181183

184+
182185
private static final class NoopInitiator implements ConnectionInitiator {
183186
@Override
184187
public Future<IOSession> connect(final NamedEndpoint endpoint,
@@ -286,6 +289,13 @@ void cancelOne() {
286289
}
287290
}
288291

292+
void completeOne() {
293+
final AsyncClientExchangeHandler h = inFlight.poll();
294+
if (h != null) {
295+
h.releaseResources();
296+
}
297+
}
298+
289299
@Override
290300
public boolean isConnected() {
291301
return connected;
@@ -307,6 +317,7 @@ public EndpointInfo getInfo() {
307317
}
308318

309319

320+
310321
private static class LatchingHandler implements AsyncClientExchangeHandler {
311322
final AtomicReference<Exception> failedException = new AtomicReference<>();
312323
final CountDownLatch failLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)