diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
index 32368dd94f602..dd2a80e4a7037 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
@@ -35,22 +35,20 @@
import java.util.concurrent.locks.ReentrantLock;
/**
- * S3 input stream with configurable read-ahead buffer, range-based requests for seek operations,
- * automatic stream reopening on errors, and lazy initialization to minimize memory footprint.
+ * S3 input stream with configurable read-ahead buffer, lazy seek, and automatic stream reopening.
*
- *
Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access and
- * resource cleanup.
+ *
{@link #seek(long)} only records the desired position without performing any I/O. All HTTP
+ * work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks
+ * between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are
+ * skipped in-buffer instead of reopening the HTTP connection.
*/
class NativeS3InputStream extends FSDataInputStream {
private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class);
- /** Default read-ahead buffer size: 256KB. */
+ /** Default read-ahead buffer size. */
private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024;
- /** Maximum buffer size for very large sequential reads. */
- private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
-
private final ReentrantLock lock = new ReentrantLock();
private final S3Client s3Client;
@@ -65,8 +63,19 @@ class NativeS3InputStream extends FSDataInputStream {
@GuardedBy("lock")
private BufferedInputStream bufferedStream;
+ /**
+ * The position the caller expects to read from next. Updated by {@link #seek(long)}, {@link
+ * #skip(long)}, and after every successful {@link #read()}.
+ */
+ @GuardedBy("lock")
+ private long nextReadPos;
+
+ /**
+ * The actual byte offset of the underlying stream cursor, reconciled lazily via {@link
+ * #lazySeek()}.
+ */
@GuardedBy("lock")
- private long position;
+ private long streamPos;
@GuardedBy("lock")
private volatile boolean closed;
@@ -86,8 +95,9 @@ public NativeS3InputStream(
this.bucketName = bucketName;
this.key = key;
this.contentLength = contentLength;
- this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE);
- this.position = 0;
+ this.readBufferSize = readBufferSize;
+ this.nextReadPos = 0;
+ this.streamPos = 0;
this.closed = false;
LOG.debug(
@@ -98,39 +108,63 @@ public NativeS3InputStream(
this.readBufferSize / 1024);
}
+ /** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading bytes. */
@GuardedBy("lock")
- private void lazyInitialize() throws IOException {
- assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock to be held";
- if (currentStream == null && !closed) {
- openStreamAtCurrentPosition();
- }
- }
+ private void lazySeek() throws IOException {
+ assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be held";
+ long targetPos = nextReadPos;
- /** At EOF, release instead of reopening: {@code bytes=contentLength-} returns S3 416. */
- @GuardedBy("lock")
- private void repositionOpenStream() throws IOException {
- assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires lock to be held";
if (currentStream == null) {
+ streamPos = targetPos;
return;
}
- if (position >= contentLength) {
+
+ if (targetPos == streamPos) {
+ return;
+ }
+
+ long diff = targetPos - streamPos;
+ streamPos = targetPos;
+
+ if (targetPos >= contentLength) {
releaseStreams();
- } else {
+ return;
+ }
+
+ // BufferedInputStream does not expose how many bytes are in its local array, so
+ // readBufferSize is used as the skip threshold: at most readBufferSize bytes may be
+ // consumed from the live HTTP connection before a range request is preferred instead.
+ if (diff > 0 && diff <= (long) readBufferSize) {
+ skipBytesInBuffer(diff);
+ return;
+ }
+
+ openStreamAtCurrentPosition();
+ }
+
+ @GuardedBy("lock")
+ private void ensureStreamOpen() throws IOException {
+ assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires lock to be held";
+ if (currentStream == null && !closed) {
openStreamAtCurrentPosition();
}
}
- /**
- * Opens (or reopens) the S3 stream at the current position.
- *
- *
This method:
- *
- *
- * - Closes any existing stream
- *
- Opens a new stream starting at {@link #position}
- *
- Uses HTTP range requests for non-zero positions
- *
- */
+ @GuardedBy("lock")
+ private void skipBytesInBuffer(long n) throws IOException {
+ assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires lock to be held";
+ long remaining = n;
+ while (remaining > 0) {
+ long skipped = bufferedStream.skip(remaining);
+ if (skipped <= 0) {
+ openStreamAtCurrentPosition();
+ return;
+ }
+ remaining -= skipped;
+ }
+ }
+
+ /** Opens (or reopens) the S3 stream at {@link #streamPos}. */
private void openStreamAtCurrentPosition() throws IOException {
lock.lock();
try {
@@ -140,11 +174,11 @@ private void openStreamAtCurrentPosition() throws IOException {
GetObjectRequest.Builder requestBuilder =
GetObjectRequest.builder().bucket(bucketName).key(key);
- if (position > 0) {
- requestBuilder.range(String.format("bytes=%d-", position));
+ if (streamPos > 0) {
+ requestBuilder.range(String.format("bytes=%d-", streamPos));
LOG.debug(
"Opening S3 stream with range: bytes={}-{}",
- position,
+ streamPos,
contentLength - 1);
} else {
LOG.debug("Opening S3 stream for full object: {} bytes", contentLength);
@@ -160,12 +194,7 @@ private void openStreamAtCurrentPosition() throws IOException {
}
}
- /**
- * Aborts the in-flight HTTP connection so that subsequent {@code close()} calls on the stream
- * do not drain remaining bytes over the network.
- *
- * @see ResponseInputStream#abort()
- */
+ /** Aborts the in-flight HTTP connection to avoid draining remaining bytes on close. */
@GuardedBy("lock")
private void abortCurrentStream() {
assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires lock to be held";
@@ -179,11 +208,10 @@ private void abortCurrentStream() {
}
/**
- * Aborts and closes both streams, nulling the references. The abort is called first to prevent
- * {@link ResponseInputStream#close()} from draining remaining bytes over the network.
+ * Aborts and closes both streams, nulling the references.
*
* @return the first {@link IOException} encountered (with subsequent ones added as suppressed),
- * or {@code null} if cleanup succeeded without errors
+ * or {@code null} if cleanup succeeded
*/
@GuardedBy("lock")
private IOException releaseStreams() {
@@ -236,10 +264,7 @@ public void seek(long desired) throws IOException {
+ contentLength);
}
- if (desired != position) {
- position = desired;
- repositionOpenStream();
- }
+ nextReadPos = desired;
} finally {
lock.unlock();
}
@@ -249,7 +274,7 @@ public void seek(long desired) throws IOException {
public long getPos() throws IOException {
lock();
try {
- return position;
+ return nextReadPos;
} finally {
lock.unlock();
}
@@ -262,13 +287,15 @@ public int read() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
- if (position >= contentLength) {
+ if (nextReadPos >= contentLength) {
return -1;
}
- lazyInitialize();
+ lazySeek();
+ ensureStreamOpen();
int data = bufferedStream.read();
if (data != -1) {
- position++;
+ nextReadPos++;
+ streamPos++;
}
return data;
} finally {
@@ -295,15 +322,17 @@ public int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
- if (position >= contentLength) {
+ if (nextReadPos >= contentLength) {
return -1;
}
- lazyInitialize();
- long remaining = contentLength - position;
+ lazySeek();
+ ensureStreamOpen();
+ long remaining = contentLength - nextReadPos;
int toRead = (int) Math.min(len, remaining);
int bytesRead = bufferedStream.read(b, off, toRead);
if (bytesRead > 0) {
- position += bytesRead;
+ nextReadPos += bytesRead;
+ streamPos += bytesRead;
}
return bytesRead;
} finally {
@@ -336,7 +365,7 @@ public void close() throws IOException {
"Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}",
bucketName,
key,
- position,
+ nextReadPos,
contentLength);
if (exception != null) {
throw exception;
@@ -346,16 +375,6 @@ public void close() throws IOException {
}
}
- /**
- * Returns an estimate of the number of bytes that can be read without blocking.
- *
- * This implementation returns the remaining bytes in the object based on content length and
- * current position. Note that actual reads may still block due to network I/O, but this
- * indicates how much data is logically available.
- *
- * @return the number of remaining bytes (capped at Integer.MAX_VALUE)
- * @throws IOException if the stream has been closed
- */
@Override
public int available() throws IOException {
lock();
@@ -363,7 +382,7 @@ public int available() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
- long remaining = contentLength - position;
+ long remaining = contentLength - nextReadPos;
return (int) Math.min(remaining, Integer.MAX_VALUE);
} finally {
lock.unlock();
@@ -380,12 +399,9 @@ public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
- long newPos = Math.min(position + n, contentLength);
- long skipped = newPos - position;
- if (newPos != position) {
- position = newPos;
- repositionOpenStream();
- }
+ long newPos = Math.min(nextReadPos + n, contentLength);
+ long skipped = newPos - nextReadPos;
+ nextReadPos = newPos;
return skipped;
} finally {
lock.unlock();
diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
index 90591fcd94ab1..32316e18bbc9b 100644
--- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
+++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
@@ -33,6 +33,7 @@
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -57,23 +58,47 @@ private static class TrackingInputStream extends InputStream implements Abortabl
private final AtomicBoolean aborted = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private volatile boolean abortedBeforeClose;
+ private final int maxAvailable;
+ private final AtomicLong bytesRead = new AtomicLong();
+ private final AtomicLong bytesSkipped = new AtomicLong();
TrackingInputStream(byte[] data, int offset) {
- this.delegate = new ByteArrayInputStream(data, offset, data.length - offset);
+ this(data, offset, Integer.MAX_VALUE);
}
- TrackingInputStream(byte[] data) {
- this(data, 0);
+ TrackingInputStream(byte[] data, int offset, int maxAvailable) {
+ this.delegate = new ByteArrayInputStream(data, offset, data.length - offset);
+ this.maxAvailable = maxAvailable;
}
@Override
public int read() {
- return delegate.read();
+ int b = delegate.read();
+ if (b != -1) {
+ bytesRead.incrementAndGet();
+ }
+ return b;
}
@Override
public int read(byte[] b, int off, int len) {
- return delegate.read(b, off, len);
+ int n = delegate.read(b, off, len);
+ if (n > 0) {
+ bytesRead.addAndGet(n);
+ }
+ return n;
+ }
+
+ @Override
+ public long skip(long n) {
+ long skipped = delegate.skip(n);
+ bytesSkipped.addAndGet(skipped);
+ return skipped;
+ }
+
+ @Override
+ public int available() {
+ return Math.min(delegate.available(), maxAvailable);
}
@Override
@@ -101,6 +126,39 @@ boolean wasClosed() {
boolean wasAbortedBeforeClose() {
return abortedBeforeClose;
}
+
+ long bytesReadFromUnderlying() {
+ return bytesRead.get();
+ }
+
+ long bytesSkippedFromUnderlying() {
+ return bytesSkipped.get();
+ }
+ }
+
+ /**
+ * A {@link TrackingInputStream} whose first {@link #skip} call returns {@code 0}, simulating a
+ * stalled/closed underlying HTTP connection during in-buffer skipping.
+ */
+ private static final class FailFirstSkipTrackingInputStream extends TrackingInputStream {
+ private final AtomicBoolean firstSkipFailed = new AtomicBoolean();
+
+ FailFirstSkipTrackingInputStream(byte[] data, int offset, int maxAvailable) {
+ super(data, offset, maxAvailable);
+ }
+
+ @Override
+ public long skip(long n) {
+ if (firstSkipFailed.compareAndSet(false, true)) {
+ return 0;
+ }
+ return super.skip(n);
+ }
+ }
+
+ @FunctionalInterface
+ private interface StreamFactory {
+ TrackingInputStream create(byte[] data, int offset, int maxAvailable);
}
/** {@link S3Client} stub. */
@@ -108,9 +166,21 @@ private static final class StubS3Client implements S3Client {
private final byte[] data;
private final AtomicInteger getObjectCalls = new AtomicInteger();
private volatile TrackingInputStream lastStream;
+ private final int maxAvailable;
+ private final StreamFactory factory;
StubS3Client(byte[] data) {
+ this(data, Integer.MAX_VALUE);
+ }
+
+ StubS3Client(byte[] data, int maxAvailable) {
+ this(data, maxAvailable, TrackingInputStream::new);
+ }
+
+ StubS3Client(byte[] data, int maxAvailable, StreamFactory factory) {
this.data = data;
+ this.maxAvailable = maxAvailable;
+ this.factory = factory;
}
@Override
@@ -121,7 +191,7 @@ public ResponseInputStream getObject(GetObjectRequest request
if (range != null && range.startsWith("bytes=")) {
offset = Integer.parseInt(range.substring(6, range.indexOf('-')));
}
- TrackingInputStream tracking = new TrackingInputStream(data, offset);
+ TrackingInputStream tracking = factory.create(data, offset, maxAvailable);
lastStream = tracking;
AbortableInputStream abortable = AbortableInputStream.create(tracking, tracking);
return new ResponseInputStream<>(
@@ -145,6 +215,8 @@ int getObjectCalls() {
}
}
+ // --- close behavior ---
+
@Test
void closeAbortsUnderlyingStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
@@ -162,116 +234,200 @@ void closeAbortsAndThenClosesUnderlyingStream() throws Exception {
in.read();
}
TrackingInputStream stream = client.lastStream();
- // abort() must be called to kill the HTTP connection (prevents drain)
assertThat(stream.wasAborted()).isTrue();
- // close() must still be called for SDK resource cleanup (connection pool return, etc.)
assertThat(stream.wasClosed()).isTrue();
- // abort() must happen BEFORE close() - otherwise close() drains remaining bytes
assertThat(stream.wasAbortedBeforeClose()).isTrue();
}
@Test
- void seekAbortsAndClosesOldStreamBeforeOpeningNew() throws Exception {
+ void closeWithoutReadNeverOpensStream() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {}
+ assertThat(client.getObjectCalls()).isEqualTo(0);
+ }
+
+ @Test
+ void doubleCloseIsIdempotent() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length);
+ in.read();
+
+ in.close();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(client.lastStream().wasAborted()).isTrue();
+
+ in.close();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(client.lastStream().wasAborted()).isTrue();
+ }
+
+ // --- lazy seek: seek() does no I/O ---
+
+ @Test
+ void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
+ in.seek(50);
+ assertThat(in.getPos()).isEqualTo(50);
+ assertThat(client.getObjectCalls()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ void seekDoesNotTriggerStreamOperations() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
in.read();
TrackingInputStream first = client.lastStream();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
- in.seek(100);
+ in.seek(0);
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(first.wasClosed()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(in.getPos()).isEqualTo(0);
+ }
+ }
- // old stream must be aborted, closed, and in the correct order
- assertThat(first.wasAborted()).isTrue();
- assertThat(first.wasClosed()).isTrue();
- assertThat(first.wasAbortedBeforeClose()).isTrue();
- assertThat(client.getObjectCalls()).isEqualTo(2);
- assertThat(in.getPos()).isEqualTo(100);
+ @Test
+ void multipleSeeksWithoutReadCoalesce() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
+ in.read();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ in.seek(200);
+ in.seek(0);
in.seek(100);
- assertThat(client.getObjectCalls()).isEqualTo(2);
+ in.seek(50);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ assertThat(in.read()).isEqualTo(50);
+ assertThat(in.getPos()).isEqualTo(51);
}
}
+ // --- seek + read: forward within buffer ---
+
@Test
- void skipAbortsOldStreamAndOpensNew() throws Exception {
+ void seekForwardWithinBufferReusesStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
in.read();
TrackingInputStream first = client.lastStream();
- assertThat(in.skip(100)).isEqualTo(100);
- assertThat(first.wasAborted()).isTrue();
- assertThat(first.wasClosed()).isTrue();
- assertThat(first.wasAbortedBeforeClose()).isTrue();
- assertThat(client.getObjectCalls()).isEqualTo(2);
- // skip(0) and skip(negative) are no-ops
- assertThat(in.skip(0)).isZero();
- assertThat(in.skip(-5)).isZero();
- assertThat(client.getObjectCalls()).isEqualTo(2);
+
+ in.seek(100);
+ assertThat(in.read()).isEqualTo(100);
+
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(first.wasClosed()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(in.getPos()).isEqualTo(101);
+
+ in.seek(101);
+ assertThat(in.read()).isEqualTo(101);
+ assertThat(client.getObjectCalls()).isEqualTo(1);
}
}
@Test
- void closeWithoutReadNeverOpensStream() throws Exception {
+ void seekForwardWithinBufferReturnsCorrectData() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
- // lazy init means no getObject call
+ assertThat(in.read()).isEqualTo(0);
+ assertThat(in.read()).isEqualTo(1);
+
+ in.seek(10);
+ assertThat(in.read()).isEqualTo(10);
+
+ in.seek(50);
+ byte[] buf = new byte[5];
+ assertThat(in.read(buf, 0, 5)).isEqualTo(5);
+ for (int i = 0; i < 5; i++) {
+ assertThat(buf[i]).isEqualTo(DATA[50 + i]);
+ }
+
+ in.seek(200);
+ assertThat(in.read()).isEqualTo(200 & 0xFF);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
}
- assertThat(client.getObjectCalls()).isEqualTo(0);
}
@Test
- void doubleCloseIsIdempotent() throws Exception {
+ void sequentialSmallSeeksReuseStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
- NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length);
- in.read();
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
+ for (int i = 0; i < 100; i++) {
+ in.seek(i * 2);
+ assertThat(in.read()).isEqualTo((i * 2) & 0xFF);
+ }
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ }
+ }
- // Verify state after first close
- in.close();
- assertThat(client.getObjectCalls()).isEqualTo(1);
- assertThat(client.lastStream().wasAborted()).isTrue();
+ // --- seek + read: forward beyond buffer ---
- // Second close should be a no-op
- in.close();
- assertThat(client.getObjectCalls()).isEqualTo(1);
- assertThat(client.lastStream().wasAborted()).isTrue();
+ @Test
+ void seekForwardBeyondBufferReopensStream() throws Exception {
+ StubS3Client client = new StubS3Client(DATA, 0);
+ int smallBuffer = 16;
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) {
+ in.read();
+ TrackingInputStream first = client.lastStream();
+
+ in.seek(1 + smallBuffer + 1);
+ assertThat(first.wasAborted()).isFalse();
+
+ assertThat(in.read()).isEqualTo(1 + smallBuffer + 1);
+ assertThat(first.wasAborted()).isTrue();
+ assertThat(first.wasClosed()).isTrue();
+ assertThat(first.wasAbortedBeforeClose()).isTrue();
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ }
}
+ // --- seek + read: backward ---
+
@Test
- void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+ void seekBackwardReopensStreamOnRead() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
- in.seek(50);
- assertThat(in.getPos()).isEqualTo(50);
- assertThat(client.getObjectCalls()).isEqualTo(0);
+ byte[] buf = new byte[50];
+ in.read(buf, 0, 50);
+ TrackingInputStream first = client.lastStream();
+
+ in.seek(10);
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ assertThat(in.read()).isEqualTo(10);
+ assertThat(first.wasAborted()).isTrue();
+ assertThat(first.wasClosed()).isTrue();
+ assertThat(first.wasAbortedBeforeClose()).isTrue();
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ assertThat(in.getPos()).isEqualTo(11);
}
}
+ // --- seek to EOF ---
+
@Test
- void readAndSeekReturnCorrectData() throws Exception {
+ void seekToContentLengthReleasesStreamOnRead() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
- // single-byte read
- assertThat(in.read()).isEqualTo(0);
- assertThat(in.getPos()).isEqualTo(1);
- // bulk read returns correct bytes and advances position
- byte[] buf = new byte[10];
- assertThat(in.read(buf, 0, 10)).isEqualTo(10);
- assertThat(in.getPos()).isEqualTo(11);
- for (int i = 0; i < 10; i++) {
- assertThat(buf[i]).isEqualTo(DATA[i + 1]);
- }
- // available() reflects remaining bytes
- assertThat(in.available()).isEqualTo(DATA.length - 11);
- // seek then read returns data at the seeked position
- in.seek(200);
- assertThat(in.read()).isEqualTo(200);
- assertThat(in.getPos()).isEqualTo(201);
- // partial read at EOF returns only remaining bytes
- in.seek(250);
- byte[] tail = new byte[20];
- assertThat(in.read(tail, 0, 20)).isEqualTo(6);
- assertThat(in.getPos()).isEqualTo(256);
- // read past EOF
+ in.read();
+ TrackingInputStream first = client.lastStream();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ in.seek(DATA.length);
+ assertThat(first.wasAborted()).isFalse();
+
assertThat(in.read()).isEqualTo(-1);
- assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+ assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
+ assertThat(client.getObjectCalls()).isEqualTo(1);
}
}
@@ -297,34 +453,53 @@ void readAtEofReturnsMinusOne() throws Exception {
assertThat(in.read(new byte[8], 0, 8)).isEqualTo(-1);
assertThat(in.getPos()).isEqualTo(DATA.length);
assertThat(in.available()).isZero();
- // EOF short-circuits before lazyInitialize, so no range request is issued.
assertThat(client.getObjectCalls()).isZero();
}
}
+ // --- skip (also lazy) ---
+
@Test
- void seekToContentLengthWithOpenStreamReleasesStream() throws Exception {
+ void skipForwardWithinBufferReusesStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
in.read();
TrackingInputStream first = client.lastStream();
+
+ assertThat(in.skip(50)).isEqualTo(50);
+ assertThat(in.getPos()).isEqualTo(51);
+
+ assertThat(first.wasAborted()).isFalse();
assertThat(client.getObjectCalls()).isEqualTo(1);
- in.seek(DATA.length);
+ assertThat(in.read()).isEqualTo(51);
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ }
+ }
+ @Test
+ void skipForwardBeyondBufferReopensStreamOnRead() throws Exception {
+ StubS3Client client = new StubS3Client(DATA, 0);
+ int smallBuffer = 16;
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) {
+ in.read();
+ TrackingInputStream first = client.lastStream();
+
+ assertThat(in.skip(smallBuffer + 1)).isEqualTo(smallBuffer + 1);
+ assertThat(first.wasAborted()).isFalse();
+
+ assertThat(in.read()).isEqualTo(1 + smallBuffer + 1);
assertThat(first.wasAborted()).isTrue();
assertThat(first.wasClosed()).isTrue();
assertThat(first.wasAbortedBeforeClose()).isTrue();
- // bytes=contentLength- is unsatisfiable, so we must not reopen.
- assertThat(client.getObjectCalls()).isEqualTo(1);
- assertThat(in.read()).isEqualTo(-1);
- assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
- assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(client.getObjectCalls()).isEqualTo(2);
}
}
@Test
- void skipToEofWithOpenStreamReleasesStream() throws Exception {
+ void skipToEofReleasesStreamOnRead() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
in.read();
@@ -333,15 +508,145 @@ void skipToEofWithOpenStreamReleasesStream() throws Exception {
assertThat(in.skip(DATA.length)).isEqualTo(DATA.length - 1);
assertThat(in.getPos()).isEqualTo(DATA.length);
+ assertThat(first.wasAborted()).isFalse();
- assertThat(first.wasAborted()).isTrue();
- assertThat(first.wasClosed()).isTrue();
- assertThat(first.wasAbortedBeforeClose()).isTrue();
+ assertThat(in.read()).isEqualTo(-1);
+ }
+ }
+
+ @Test
+ void skipZeroAndNegativeAreNoOps() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
+ in.read();
+ assertThat(in.skip(0)).isZero();
+ assertThat(in.skip(-5)).isZero();
assertThat(client.getObjectCalls()).isEqualTo(1);
+ }
+ }
+
+ // --- read + seek integration ---
+
+ @Test
+ void readAndSeekReturnCorrectData() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, DATA.length)) {
+ assertThat(in.read()).isEqualTo(0);
+ assertThat(in.getPos()).isEqualTo(1);
+ byte[] buf = new byte[10];
+ assertThat(in.read(buf, 0, 10)).isEqualTo(10);
+ assertThat(in.getPos()).isEqualTo(11);
+ for (int i = 0; i < 10; i++) {
+ assertThat(buf[i]).isEqualTo(DATA[i + 1]);
+ }
+ assertThat(in.available()).isEqualTo(DATA.length - 11);
+ in.seek(200);
+ assertThat(in.read()).isEqualTo(200);
+ assertThat(in.getPos()).isEqualTo(201);
+ in.seek(250);
+ byte[] tail = new byte[20];
+ assertThat(in.read(tail, 0, 20)).isEqualTo(6);
+ assertThat(in.getPos()).isEqualTo(256);
assertThat(in.read()).isEqualTo(-1);
+ assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+ }
+ }
+
+ // --- buffer-efficiency: skip stays in local buffer vs. underlying stream ---
+
+ @Test
+ void seekWithinBuffer_afterSmallRead_doesNotTouchUnderlyingStream() throws Exception {
+ int smallBuffer = 32;
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) {
+ // single-byte read opens the S3 stream and advances position to 1
+ assertThat(in.read()).isEqualTo(0);
+ TrackingInputStream underlying = client.lastStream();
+ long initialBytesRead = underlying.bytesReadFromUnderlying();
+
+ // forward seek of 9 bytes — fits in the local buffer, no underlying access needed
+ in.seek(10);
+ assertThat(in.read()).isEqualTo(10);
+ assertThat(in.getPos()).isEqualTo(11);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(0);
+ assertThat(underlying.bytesReadFromUnderlying()).isEqualTo(initialBytesRead);
}
}
+ @Test
+ void seekWithinBuffer_afterLargeRead_touchesUnderlyingStream() throws Exception {
+ int smallBuffer = 16;
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) {
+ // bulk read >= bufferSize bypasses BufferedInputStream's local array entirely,
+ // leaving the local buffer empty afterward
+ byte[] buf = new byte[smallBuffer];
+ in.read(buf, 0, smallBuffer);
+ TrackingInputStream underlying = client.lastStream();
+
+ // forward seek of 10 bytes — within readBufferSize but buffer is empty, so
+ // BufferedInputStream.skip() delegates directly to the underlying stream
+ in.seek((long) smallBuffer + 10);
+ assertThat(in.read()).isEqualTo(DATA[smallBuffer + 10] & 0xFF);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(10);
+ }
+ }
+
+ @Test
+ void seekBeyondReadBufferSize_inflatedByUnderlyingAvailable_reopensWithRangeRequest()
+ throws Exception {
+ int smallBuffer = 16;
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) {
+ in.read();
+ TrackingInputStream first = client.lastStream();
+
+ // seek 50 bytes forward — exceeds readBufferSize=16, must reopen with range request
+ in.seek(51);
+ assertThat(in.read()).isEqualTo(51);
+
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ assertThat(first.bytesSkippedFromUnderlying()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ void skipBytesInBuffer_skipFailureTriggersRangeRequest() throws Exception {
+ int smallBuffer = 32;
+ // FailFirstSkipTrackingInputStream returns 0 on the first skip() call, simulating a
+ // stalled/closed HTTP connection. BufferedInputStream propagates 0 when its internal
+ // buffer is empty, so skipBytesInBuffer() falls back to openStreamAtCurrentPosition().
+ StubS3Client client =
+ new StubS3Client(DATA, Integer.MAX_VALUE, FailFirstSkipTrackingInputStream::new);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length, smallBuffer)) {
+ // Read exactly one buffer-worth: BufferedInputStream bypasses its internal array for
+ // len >= bufSize reads, leaving the buffer empty after the call.
+ byte[] buf = new byte[smallBuffer];
+ in.read(buf, 0, smallBuffer);
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ // Seek 10 bytes forward — within readBufferSize, so skipBytesInBuffer() is chosen.
+ // With the buffer empty, BufferedInputStream delegates to the underlying skip(),
+ // which returns 0 on the first call, triggering openStreamAtCurrentPosition().
+ int seekTarget = smallBuffer + 10;
+ in.seek(seekTarget);
+
+ assertThat(in.read()).isEqualTo(DATA[seekTarget] & 0xFF);
+ assertThat(in.getPos()).isEqualTo(seekTarget + 1);
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ }
+ }
+
+ // --- argument validation ---
+
@Test
void rejectsInvalidArguments() throws Exception {
StubS3Client client = new StubS3Client(DATA);