diff --git a/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java b/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java index cf09223..363edad 100644 --- a/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java +++ b/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java @@ -26,6 +26,7 @@ import org.commonjava.util.sidecar.config.ProxyConfiguration; import org.commonjava.util.sidecar.interceptor.ExceptionHandler; import org.commonjava.util.sidecar.interceptor.MetricsHandler; +import org.commonjava.util.sidecar.util.BufferStreamingOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; @@ -206,8 +208,8 @@ private Response convertProxyResp( HttpResponse resp ) } ); if ( resp.body() != null ) { - byte[] bytes = resp.body().getBytes(); - builder.entity( bytes ); + StreamingOutput so = new BufferStreamingOutput( resp ); + builder.entity( so ); } return builder.build(); } diff --git a/src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java b/src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java new file mode 100644 index 0000000..f040029 --- /dev/null +++ b/src/main/java/org/commonjava/util/sidecar/util/BufferStreamingOutput.java @@ -0,0 +1,86 @@ +package org.commonjava.util.sidecar.util; + +import io.vertx.mutiny.core.buffer.Buffer; +import io.vertx.mutiny.ext.web.client.HttpResponse; +import org.apache.commons.io.output.CountingOutputStream; +import org.apache.commons.io.output.TeeOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; +import javax.xml.bind.DatatypeConverter; +import java.io.IOException; +import java.io.OutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class BufferStreamingOutput + implements StreamingOutput +{ + private static final int bufSize = 10 * 1024 * 1024; + + private final Logger logger = LoggerFactory.getLogger( getClass() ); + + private HttpResponse response; + + private Supplier cacheStreamSupplier; + + public BufferStreamingOutput( HttpResponse response ) + { + this.response = response; + } + + @Override + public void write( OutputStream output ) throws IOException, WebApplicationException + { + OutputStream cacheStream = null; + try(CountingOutputStream cout = new CountingOutputStream( output )) + { + OutputStream out = cout; + if ( cacheStreamSupplier != null ) + { + cacheStream = cacheStreamSupplier.get(); + if ( cacheStream != null ) + { + out = new TeeOutputStream( cacheStream, output ); + } + } + + Buffer buffer = response.bodyAsBuffer(); + int total = buffer.length(); + int transferred = 0; + while ( transferred < total ) + { + int next = bufSize < total ? bufSize : total; + byte[] bytes = buffer.getBytes( transferred, next ); + out.write( bytes ); + + transferred = next; + } + out.flush(); + } + finally + { + if ( cacheStream != null ) + { + try + { + cacheStream.close(); + } + catch ( Exception e ) + { + logger.error( "Failed to close cache stream: " + e.getMessage(), e ); + } + } + } + } + + public void setCacheStreamSupplier( Supplier cacheStreamSupplier ) + { + this.cacheStreamSupplier = cacheStreamSupplier; + } +}