From 274990636b444b82048f1b7c8b5d94c28f8cf221 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Tue, 5 May 2026 00:23:16 +0530 Subject: [PATCH] [FLINK-39603] Optimize NativeS3InputStream via seeks and reduce IOPS --- .../fs/s3native/NativeS3InputStream.java | 166 ++++--- .../fs/s3native/NativeS3InputStreamTest.java | 469 +++++++++++++++--- 2 files changed, 479 insertions(+), 156 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java index 32368dd94f602..d4f637c661307 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java @@ -35,21 +35,21 @@ import java.util.concurrent.locks.ReentrantLock; /** - * S3 input stream with configurable read-ahead buffer, range-based requests for seek operations, - * automatic stream reopening on errors, and lazy initialization to minimize memory footprint. + * S3 input stream with configurable read-ahead buffer, lazy seek, and automatic stream reopening. * - *

Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access and - * resource cleanup. + *

{@link #seek(long)} only records the desired position without performing any I/O. All HTTP + * work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks + * between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are + * skipped in-buffer instead of reopening the HTTP connection. */ class NativeS3InputStream extends FSDataInputStream { private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class); - /** Default read-ahead buffer size: 256KB. */ + /** Default read-ahead buffer size. */ private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024; - /** Maximum buffer size for very large sequential reads. */ - private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB + private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; private final ReentrantLock lock = new ReentrantLock(); @@ -65,8 +65,19 @@ class NativeS3InputStream extends FSDataInputStream { @GuardedBy("lock") private BufferedInputStream bufferedStream; + /** + * The position the caller expects to read from next. Updated by {@link #seek(long)}, {@link + * #skip(long)}, and after every successful {@link #read()}. + */ + @GuardedBy("lock") + private long nextReadPos; + + /** + * The actual byte offset of the underlying stream cursor, reconciled lazily via {@link + * #lazySeek()}. + */ @GuardedBy("lock") - private long position; + private long streamPos; @GuardedBy("lock") private volatile boolean closed; @@ -87,7 +98,8 @@ public NativeS3InputStream( this.key = key; this.contentLength = contentLength; this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE); - this.position = 0; + this.nextReadPos = 0; + this.streamPos = 0; this.closed = false; LOG.debug( @@ -98,39 +110,63 @@ public NativeS3InputStream( this.readBufferSize / 1024); } + /** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading bytes. */ @GuardedBy("lock") - private void lazyInitialize() throws IOException { - assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock to be held"; - if (currentStream == null && !closed) { - openStreamAtCurrentPosition(); - } - } + private void lazySeek() throws IOException { + assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be held"; + long targetPos = nextReadPos; - /** At EOF, release instead of reopening: {@code bytes=contentLength-} returns S3 416. */ - @GuardedBy("lock") - private void repositionOpenStream() throws IOException { - assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires lock to be held"; if (currentStream == null) { + streamPos = targetPos; return; } - if (position >= contentLength) { + + if (targetPos == streamPos) { + return; + } + + long diff = targetPos - streamPos; + streamPos = targetPos; + + if (targetPos >= contentLength) { releaseStreams(); - } else { + return; + } + + // BufferedInputStream does not expose how many bytes are in its local array, so + // readBufferSize is used as the skip threshold: at most readBufferSize bytes may be + // consumed from the live HTTP connection before a range request is preferred instead. + if (diff > 0 && diff <= (long) readBufferSize) { + skipBytesInBuffer(diff); + return; + } + + openStreamAtCurrentPosition(); + } + + @GuardedBy("lock") + private void ensureStreamOpen() throws IOException { + assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires lock to be held"; + if (currentStream == null && !closed) { openStreamAtCurrentPosition(); } } - /** - * Opens (or reopens) the S3 stream at the current position. - * - *

This method: - * - *

- */ + @GuardedBy("lock") + private void skipBytesInBuffer(long n) throws IOException { + assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires lock to be held"; + long remaining = n; + while (remaining > 0) { + long skipped = bufferedStream.skip(remaining); + if (skipped <= 0) { + openStreamAtCurrentPosition(); + return; + } + remaining -= skipped; + } + } + + /** Opens (or reopens) the S3 stream at {@link #streamPos}. */ private void openStreamAtCurrentPosition() throws IOException { lock.lock(); try { @@ -140,11 +176,11 @@ private void openStreamAtCurrentPosition() throws IOException { GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder().bucket(bucketName).key(key); - if (position > 0) { - requestBuilder.range(String.format("bytes=%d-", position)); + if (streamPos > 0) { + requestBuilder.range(String.format("bytes=%d-", streamPos)); LOG.debug( "Opening S3 stream with range: bytes={}-{}", - position, + streamPos, contentLength - 1); } else { LOG.debug("Opening S3 stream for full object: {} bytes", contentLength); @@ -160,12 +196,7 @@ private void openStreamAtCurrentPosition() throws IOException { } } - /** - * Aborts the in-flight HTTP connection so that subsequent {@code close()} calls on the stream - * do not drain remaining bytes over the network. - * - * @see ResponseInputStream#abort() - */ + /** Aborts the in-flight HTTP connection to avoid draining remaining bytes on close. */ @GuardedBy("lock") private void abortCurrentStream() { assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires lock to be held"; @@ -179,11 +210,10 @@ private void abortCurrentStream() { } /** - * Aborts and closes both streams, nulling the references. The abort is called first to prevent - * {@link ResponseInputStream#close()} from draining remaining bytes over the network. + * Aborts and closes both streams, nulling the references. * * @return the first {@link IOException} encountered (with subsequent ones added as suppressed), - * or {@code null} if cleanup succeeded without errors + * or {@code null} if cleanup succeeded */ @GuardedBy("lock") private IOException releaseStreams() { @@ -236,10 +266,7 @@ public void seek(long desired) throws IOException { + contentLength); } - if (desired != position) { - position = desired; - repositionOpenStream(); - } + nextReadPos = desired; } finally { lock.unlock(); } @@ -249,7 +276,7 @@ public void seek(long desired) throws IOException { public long getPos() throws IOException { lock(); try { - return position; + return nextReadPos; } finally { lock.unlock(); } @@ -262,13 +289,15 @@ public int read() throws IOException { if (closed) { throw new IOException("Stream is closed"); } - if (position >= contentLength) { + if (nextReadPos >= contentLength) { return -1; } - lazyInitialize(); + lazySeek(); + ensureStreamOpen(); int data = bufferedStream.read(); if (data != -1) { - position++; + nextReadPos++; + streamPos++; } return data; } finally { @@ -295,15 +324,17 @@ public int read(byte[] b, int off, int len) throws IOException { if (closed) { throw new IOException("Stream is closed"); } - if (position >= contentLength) { + if (nextReadPos >= contentLength) { return -1; } - lazyInitialize(); - long remaining = contentLength - position; + lazySeek(); + ensureStreamOpen(); + long remaining = contentLength - nextReadPos; int toRead = (int) Math.min(len, remaining); int bytesRead = bufferedStream.read(b, off, toRead); if (bytesRead > 0) { - position += bytesRead; + nextReadPos += bytesRead; + streamPos += bytesRead; } return bytesRead; } finally { @@ -336,7 +367,7 @@ public void close() throws IOException { "Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}", bucketName, key, - position, + nextReadPos, contentLength); if (exception != null) { throw exception; @@ -346,16 +377,6 @@ public void close() throws IOException { } } - /** - * Returns an estimate of the number of bytes that can be read without blocking. - * - *

This implementation returns the remaining bytes in the object based on content length and - * current position. Note that actual reads may still block due to network I/O, but this - * indicates how much data is logically available. - * - * @return the number of remaining bytes (capped at Integer.MAX_VALUE) - * @throws IOException if the stream has been closed - */ @Override public int available() throws IOException { lock(); @@ -363,7 +384,7 @@ public int available() throws IOException { if (closed) { throw new IOException("Stream is closed"); } - long remaining = contentLength - position; + long remaining = contentLength - nextReadPos; return (int) Math.min(remaining, Integer.MAX_VALUE); } finally { lock.unlock(); @@ -380,12 +401,9 @@ public long skip(long n) throws IOException { if (n <= 0) { return 0; } - long newPos = Math.min(position + n, contentLength); - long skipped = newPos - position; - if (newPos != position) { - position = newPos; - repositionOpenStream(); - } + long newPos = Math.min(nextReadPos + n, contentLength); + long skipped = newPos - nextReadPos; + nextReadPos = newPos; return skipped; } finally { lock.unlock(); diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java index 90591fcd94ab1..32316e18bbc9b 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java @@ -33,6 +33,7 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -57,23 +58,47 @@ private static class TrackingInputStream extends InputStream implements Abortabl private final AtomicBoolean aborted = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private volatile boolean abortedBeforeClose; + private final int maxAvailable; + private final AtomicLong bytesRead = new AtomicLong(); + private final AtomicLong bytesSkipped = new AtomicLong(); TrackingInputStream(byte[] data, int offset) { - this.delegate = new ByteArrayInputStream(data, offset, data.length - offset); + this(data, offset, Integer.MAX_VALUE); } - TrackingInputStream(byte[] data) { - this(data, 0); + TrackingInputStream(byte[] data, int offset, int maxAvailable) { + this.delegate = new ByteArrayInputStream(data, offset, data.length - offset); + this.maxAvailable = maxAvailable; } @Override public int read() { - return delegate.read(); + int b = delegate.read(); + if (b != -1) { + bytesRead.incrementAndGet(); + } + return b; } @Override public int read(byte[] b, int off, int len) { - return delegate.read(b, off, len); + int n = delegate.read(b, off, len); + if (n > 0) { + bytesRead.addAndGet(n); + } + return n; + } + + @Override + public long skip(long n) { + long skipped = delegate.skip(n); + bytesSkipped.addAndGet(skipped); + return skipped; + } + + @Override + public int available() { + return Math.min(delegate.available(), maxAvailable); } @Override @@ -101,6 +126,39 @@ boolean wasClosed() { boolean wasAbortedBeforeClose() { return abortedBeforeClose; } + + long bytesReadFromUnderlying() { + return bytesRead.get(); + } + + long bytesSkippedFromUnderlying() { + return bytesSkipped.get(); + } + } + + /** + * A {@link TrackingInputStream} whose first {@link #skip} call returns {@code 0}, simulating a + * stalled/closed underlying HTTP connection during in-buffer skipping. + */ + private static final class FailFirstSkipTrackingInputStream extends TrackingInputStream { + private final AtomicBoolean firstSkipFailed = new AtomicBoolean(); + + FailFirstSkipTrackingInputStream(byte[] data, int offset, int maxAvailable) { + super(data, offset, maxAvailable); + } + + @Override + public long skip(long n) { + if (firstSkipFailed.compareAndSet(false, true)) { + return 0; + } + return super.skip(n); + } + } + + @FunctionalInterface + private interface StreamFactory { + TrackingInputStream create(byte[] data, int offset, int maxAvailable); } /** {@link S3Client} stub. */ @@ -108,9 +166,21 @@ private static final class StubS3Client implements S3Client { private final byte[] data; private final AtomicInteger getObjectCalls = new AtomicInteger(); private volatile TrackingInputStream lastStream; + private final int maxAvailable; + private final StreamFactory factory; StubS3Client(byte[] data) { + this(data, Integer.MAX_VALUE); + } + + StubS3Client(byte[] data, int maxAvailable) { + this(data, maxAvailable, TrackingInputStream::new); + } + + StubS3Client(byte[] data, int maxAvailable, StreamFactory factory) { this.data = data; + this.maxAvailable = maxAvailable; + this.factory = factory; } @Override @@ -121,7 +191,7 @@ public ResponseInputStream getObject(GetObjectRequest request if (range != null && range.startsWith("bytes=")) { offset = Integer.parseInt(range.substring(6, range.indexOf('-'))); } - TrackingInputStream tracking = new TrackingInputStream(data, offset); + TrackingInputStream tracking = factory.create(data, offset, maxAvailable); lastStream = tracking; AbortableInputStream abortable = AbortableInputStream.create(tracking, tracking); return new ResponseInputStream<>( @@ -145,6 +215,8 @@ int getObjectCalls() { } } + // --- close behavior --- + @Test void closeAbortsUnderlyingStream() throws Exception { StubS3Client client = new StubS3Client(DATA); @@ -162,116 +234,200 @@ void closeAbortsAndThenClosesUnderlyingStream() throws Exception { in.read(); } TrackingInputStream stream = client.lastStream(); - // abort() must be called to kill the HTTP connection (prevents drain) assertThat(stream.wasAborted()).isTrue(); - // close() must still be called for SDK resource cleanup (connection pool return, etc.) assertThat(stream.wasClosed()).isTrue(); - // abort() must happen BEFORE close() - otherwise close() drains remaining bytes assertThat(stream.wasAbortedBeforeClose()).isTrue(); } @Test - void seekAbortsAndClosesOldStreamBeforeOpeningNew() throws Exception { + void closeWithoutReadNeverOpensStream() throws Exception { + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {} + assertThat(client.getObjectCalls()).isEqualTo(0); + } + + @Test + void doubleCloseIsIdempotent() throws Exception { + StubS3Client client = new StubS3Client(DATA); + NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length); + in.read(); + + in.close(); + assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(client.lastStream().wasAborted()).isTrue(); + + in.close(); + assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(client.lastStream().wasAborted()).isTrue(); + } + + // --- lazy seek: seek() does no I/O --- + + @Test + void seekBeforeFirstReadUpdatesPositionOnly() throws Exception { + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { + in.seek(50); + assertThat(in.getPos()).isEqualTo(50); + assertThat(client.getObjectCalls()).isEqualTo(0); + } + } + + @Test + void seekDoesNotTriggerStreamOperations() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { in.read(); TrackingInputStream first = client.lastStream(); + assertThat(client.getObjectCalls()).isEqualTo(1); - in.seek(100); + in.seek(0); + assertThat(first.wasAborted()).isFalse(); + assertThat(first.wasClosed()).isFalse(); + assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(in.getPos()).isEqualTo(0); + } + } - // old stream must be aborted, closed, and in the correct order - assertThat(first.wasAborted()).isTrue(); - assertThat(first.wasClosed()).isTrue(); - assertThat(first.wasAbortedBeforeClose()).isTrue(); - assertThat(client.getObjectCalls()).isEqualTo(2); - assertThat(in.getPos()).isEqualTo(100); + @Test + void multipleSeeksWithoutReadCoalesce() throws Exception { + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { + in.read(); + assertThat(client.getObjectCalls()).isEqualTo(1); + + in.seek(200); + in.seek(0); in.seek(100); - assertThat(client.getObjectCalls()).isEqualTo(2); + in.seek(50); + + assertThat(client.getObjectCalls()).isEqualTo(1); + + assertThat(in.read()).isEqualTo(50); + assertThat(in.getPos()).isEqualTo(51); } } + // --- seek + read: forward within buffer --- + @Test - void skipAbortsOldStreamAndOpensNew() throws Exception { + void seekForwardWithinBufferReusesStream() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { in.read(); TrackingInputStream first = client.lastStream(); - assertThat(in.skip(100)).isEqualTo(100); - assertThat(first.wasAborted()).isTrue(); - assertThat(first.wasClosed()).isTrue(); - assertThat(first.wasAbortedBeforeClose()).isTrue(); - assertThat(client.getObjectCalls()).isEqualTo(2); - // skip(0) and skip(negative) are no-ops - assertThat(in.skip(0)).isZero(); - assertThat(in.skip(-5)).isZero(); - assertThat(client.getObjectCalls()).isEqualTo(2); + + in.seek(100); + assertThat(in.read()).isEqualTo(100); + + assertThat(first.wasAborted()).isFalse(); + assertThat(first.wasClosed()).isFalse(); + assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(in.getPos()).isEqualTo(101); + + in.seek(101); + assertThat(in.read()).isEqualTo(101); + assertThat(client.getObjectCalls()).isEqualTo(1); } } @Test - void closeWithoutReadNeverOpensStream() throws Exception { + void seekForwardWithinBufferReturnsCorrectData() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { - // lazy init means no getObject call + assertThat(in.read()).isEqualTo(0); + assertThat(in.read()).isEqualTo(1); + + in.seek(10); + assertThat(in.read()).isEqualTo(10); + + in.seek(50); + byte[] buf = new byte[5]; + assertThat(in.read(buf, 0, 5)).isEqualTo(5); + for (int i = 0; i < 5; i++) { + assertThat(buf[i]).isEqualTo(DATA[50 + i]); + } + + in.seek(200); + assertThat(in.read()).isEqualTo(200 & 0xFF); + + assertThat(client.getObjectCalls()).isEqualTo(1); } - assertThat(client.getObjectCalls()).isEqualTo(0); } @Test - void doubleCloseIsIdempotent() throws Exception { + void sequentialSmallSeeksReuseStream() throws Exception { StubS3Client client = new StubS3Client(DATA); - NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length); - in.read(); + try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { + for (int i = 0; i < 100; i++) { + in.seek(i * 2); + assertThat(in.read()).isEqualTo((i * 2) & 0xFF); + } + assertThat(client.getObjectCalls()).isEqualTo(1); + } + } - // Verify state after first close - in.close(); - assertThat(client.getObjectCalls()).isEqualTo(1); - assertThat(client.lastStream().wasAborted()).isTrue(); + // --- seek + read: forward beyond buffer --- - // Second close should be a no-op - in.close(); - assertThat(client.getObjectCalls()).isEqualTo(1); - assertThat(client.lastStream().wasAborted()).isTrue(); + @Test + void seekForwardBeyondBufferReopensStream() throws Exception { + StubS3Client client = new StubS3Client(DATA, 0); + int smallBuffer = 16; + try (NativeS3InputStream in = + new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) { + in.read(); + TrackingInputStream first = client.lastStream(); + + in.seek(1 + smallBuffer + 1); + assertThat(first.wasAborted()).isFalse(); + + assertThat(in.read()).isEqualTo(1 + smallBuffer + 1); + assertThat(first.wasAborted()).isTrue(); + assertThat(first.wasClosed()).isTrue(); + assertThat(first.wasAbortedBeforeClose()).isTrue(); + assertThat(client.getObjectCalls()).isEqualTo(2); + } } + // --- seek + read: backward --- + @Test - void seekBeforeFirstReadUpdatesPositionOnly() throws Exception { + void seekBackwardReopensStreamOnRead() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { - in.seek(50); - assertThat(in.getPos()).isEqualTo(50); - assertThat(client.getObjectCalls()).isEqualTo(0); + byte[] buf = new byte[50]; + in.read(buf, 0, 50); + TrackingInputStream first = client.lastStream(); + + in.seek(10); + assertThat(first.wasAborted()).isFalse(); + assertThat(client.getObjectCalls()).isEqualTo(1); + + assertThat(in.read()).isEqualTo(10); + assertThat(first.wasAborted()).isTrue(); + assertThat(first.wasClosed()).isTrue(); + assertThat(first.wasAbortedBeforeClose()).isTrue(); + assertThat(client.getObjectCalls()).isEqualTo(2); + assertThat(in.getPos()).isEqualTo(11); } } + // --- seek to EOF --- + @Test - void readAndSeekReturnCorrectData() throws Exception { + void seekToContentLengthReleasesStreamOnRead() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { - // single-byte read - assertThat(in.read()).isEqualTo(0); - assertThat(in.getPos()).isEqualTo(1); - // bulk read returns correct bytes and advances position - byte[] buf = new byte[10]; - assertThat(in.read(buf, 0, 10)).isEqualTo(10); - assertThat(in.getPos()).isEqualTo(11); - for (int i = 0; i < 10; i++) { - assertThat(buf[i]).isEqualTo(DATA[i + 1]); - } - // available() reflects remaining bytes - assertThat(in.available()).isEqualTo(DATA.length - 11); - // seek then read returns data at the seeked position - in.seek(200); - assertThat(in.read()).isEqualTo(200); - assertThat(in.getPos()).isEqualTo(201); - // partial read at EOF returns only remaining bytes - in.seek(250); - byte[] tail = new byte[20]; - assertThat(in.read(tail, 0, 20)).isEqualTo(6); - assertThat(in.getPos()).isEqualTo(256); - // read past EOF + in.read(); + TrackingInputStream first = client.lastStream(); + assertThat(client.getObjectCalls()).isEqualTo(1); + + in.seek(DATA.length); + assertThat(first.wasAborted()).isFalse(); + assertThat(in.read()).isEqualTo(-1); - assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1); + assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1); + assertThat(client.getObjectCalls()).isEqualTo(1); } } @@ -297,34 +453,53 @@ void readAtEofReturnsMinusOne() throws Exception { assertThat(in.read(new byte[8], 0, 8)).isEqualTo(-1); assertThat(in.getPos()).isEqualTo(DATA.length); assertThat(in.available()).isZero(); - // EOF short-circuits before lazyInitialize, so no range request is issued. assertThat(client.getObjectCalls()).isZero(); } } + // --- skip (also lazy) --- + @Test - void seekToContentLengthWithOpenStreamReleasesStream() throws Exception { + void skipForwardWithinBufferReusesStream() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { in.read(); TrackingInputStream first = client.lastStream(); + + assertThat(in.skip(50)).isEqualTo(50); + assertThat(in.getPos()).isEqualTo(51); + + assertThat(first.wasAborted()).isFalse(); assertThat(client.getObjectCalls()).isEqualTo(1); - in.seek(DATA.length); + assertThat(in.read()).isEqualTo(51); + assertThat(first.wasAborted()).isFalse(); + assertThat(client.getObjectCalls()).isEqualTo(1); + } + } + @Test + void skipForwardBeyondBufferReopensStreamOnRead() throws Exception { + StubS3Client client = new StubS3Client(DATA, 0); + int smallBuffer = 16; + try (NativeS3InputStream in = + new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) { + in.read(); + TrackingInputStream first = client.lastStream(); + + assertThat(in.skip(smallBuffer + 1)).isEqualTo(smallBuffer + 1); + assertThat(first.wasAborted()).isFalse(); + + assertThat(in.read()).isEqualTo(1 + smallBuffer + 1); assertThat(first.wasAborted()).isTrue(); assertThat(first.wasClosed()).isTrue(); assertThat(first.wasAbortedBeforeClose()).isTrue(); - // bytes=contentLength- is unsatisfiable, so we must not reopen. - assertThat(client.getObjectCalls()).isEqualTo(1); - assertThat(in.read()).isEqualTo(-1); - assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1); - assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(client.getObjectCalls()).isEqualTo(2); } } @Test - void skipToEofWithOpenStreamReleasesStream() throws Exception { + void skipToEofReleasesStreamOnRead() throws Exception { StubS3Client client = new StubS3Client(DATA); try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { in.read(); @@ -333,15 +508,145 @@ void skipToEofWithOpenStreamReleasesStream() throws Exception { assertThat(in.skip(DATA.length)).isEqualTo(DATA.length - 1); assertThat(in.getPos()).isEqualTo(DATA.length); + assertThat(first.wasAborted()).isFalse(); - assertThat(first.wasAborted()).isTrue(); - assertThat(first.wasClosed()).isTrue(); - assertThat(first.wasAbortedBeforeClose()).isTrue(); + assertThat(in.read()).isEqualTo(-1); + } + } + + @Test + void skipZeroAndNegativeAreNoOps() throws Exception { + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { + in.read(); + assertThat(in.skip(0)).isZero(); + assertThat(in.skip(-5)).isZero(); assertThat(client.getObjectCalls()).isEqualTo(1); + } + } + + // --- read + seek integration --- + + @Test + void readAndSeekReturnCorrectData() throws Exception { + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) { + assertThat(in.read()).isEqualTo(0); + assertThat(in.getPos()).isEqualTo(1); + byte[] buf = new byte[10]; + assertThat(in.read(buf, 0, 10)).isEqualTo(10); + assertThat(in.getPos()).isEqualTo(11); + for (int i = 0; i < 10; i++) { + assertThat(buf[i]).isEqualTo(DATA[i + 1]); + } + assertThat(in.available()).isEqualTo(DATA.length - 11); + in.seek(200); + assertThat(in.read()).isEqualTo(200); + assertThat(in.getPos()).isEqualTo(201); + in.seek(250); + byte[] tail = new byte[20]; + assertThat(in.read(tail, 0, 20)).isEqualTo(6); + assertThat(in.getPos()).isEqualTo(256); assertThat(in.read()).isEqualTo(-1); + assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1); + } + } + + // --- buffer-efficiency: skip stays in local buffer vs. underlying stream --- + + @Test + void seekWithinBuffer_afterSmallRead_doesNotTouchUnderlyingStream() throws Exception { + int smallBuffer = 32; + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = + new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) { + // single-byte read opens the S3 stream and advances position to 1 + assertThat(in.read()).isEqualTo(0); + TrackingInputStream underlying = client.lastStream(); + long initialBytesRead = underlying.bytesReadFromUnderlying(); + + // forward seek of 9 bytes — fits in the local buffer, no underlying access needed + in.seek(10); + assertThat(in.read()).isEqualTo(10); + assertThat(in.getPos()).isEqualTo(11); + + assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(0); + assertThat(underlying.bytesReadFromUnderlying()).isEqualTo(initialBytesRead); } } + @Test + void seekWithinBuffer_afterLargeRead_touchesUnderlyingStream() throws Exception { + int smallBuffer = 16; + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = + new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) { + // bulk read >= bufferSize bypasses BufferedInputStream's local array entirely, + // leaving the local buffer empty afterward + byte[] buf = new byte[smallBuffer]; + in.read(buf, 0, smallBuffer); + TrackingInputStream underlying = client.lastStream(); + + // forward seek of 10 bytes — within readBufferSize but buffer is empty, so + // BufferedInputStream.skip() delegates directly to the underlying stream + in.seek((long) smallBuffer + 10); + assertThat(in.read()).isEqualTo(DATA[smallBuffer + 10] & 0xFF); + + assertThat(client.getObjectCalls()).isEqualTo(1); + assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(10); + } + } + + @Test + void seekBeyondReadBufferSize_inflatedByUnderlyingAvailable_reopensWithRangeRequest() + throws Exception { + int smallBuffer = 16; + StubS3Client client = new StubS3Client(DATA); + try (NativeS3InputStream in = + new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) { + in.read(); + TrackingInputStream first = client.lastStream(); + + // seek 50 bytes forward — exceeds readBufferSize=16, must reopen with range request + in.seek(51); + assertThat(in.read()).isEqualTo(51); + + assertThat(client.getObjectCalls()).isEqualTo(2); + assertThat(first.bytesSkippedFromUnderlying()).isEqualTo(0); + } + } + + @Test + void skipBytesInBuffer_skipFailureTriggersRangeRequest() throws Exception { + int smallBuffer = 32; + // FailFirstSkipTrackingInputStream returns 0 on the first skip() call, simulating a + // stalled/closed HTTP connection. BufferedInputStream propagates 0 when its internal + // buffer is empty, so skipBytesInBuffer() falls back to openStreamAtCurrentPosition(). + StubS3Client client = + new StubS3Client(DATA, Integer.MAX_VALUE, FailFirstSkipTrackingInputStream::new); + try (NativeS3InputStream in = + new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) { + // Read exactly one buffer-worth: BufferedInputStream bypasses its internal array for + // len >= bufSize reads, leaving the buffer empty after the call. + byte[] buf = new byte[smallBuffer]; + in.read(buf, 0, smallBuffer); + assertThat(client.getObjectCalls()).isEqualTo(1); + + // Seek 10 bytes forward — within readBufferSize, so skipBytesInBuffer() is chosen. + // With the buffer empty, BufferedInputStream delegates to the underlying skip(), + // which returns 0 on the first call, triggering openStreamAtCurrentPosition(). + int seekTarget = smallBuffer + 10; + in.seek(seekTarget); + + assertThat(in.read()).isEqualTo(DATA[seekTarget] & 0xFF); + assertThat(in.getPos()).isEqualTo(seekTarget + 1); + assertThat(client.getObjectCalls()).isEqualTo(2); + } + } + + // --- argument validation --- + @Test void rejectsInvalidArguments() throws Exception { StubS3Client client = new StubS3Client(DATA);