Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@
import java.util.concurrent.locks.ReentrantLock;

/**
* S3 input stream with configurable read-ahead buffer, range-based requests for seek operations,
* automatic stream reopening on errors, and lazy initialization to minimize memory footprint.
* S3 input stream with configurable read-ahead buffer, lazy seek, and automatic stream reopening.
*
* <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe concurrent access and
* resource cleanup.
* <p>{@link #seek(long)} only records the desired position without performing any I/O. All HTTP
* work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks
* between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are
* skipped in-buffer instead of reopening the HTTP connection.
*/
class NativeS3InputStream extends FSDataInputStream {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class);

/** Default read-ahead buffer size: 256KB. */
/** Default read-ahead buffer size. */
private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024;

/** Maximum buffer size for very large sequential reads. */
private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024;

private final ReentrantLock lock = new ReentrantLock();

Expand All @@ -65,8 +65,19 @@ class NativeS3InputStream extends FSDataInputStream {
@GuardedBy("lock")
private BufferedInputStream bufferedStream;

/**
* The position the caller expects to read from next. Updated by {@link #seek(long)}, {@link
* #skip(long)}, and after every successful {@link #read()}.
*/
@GuardedBy("lock")
private long nextReadPos;

/**
* The actual byte offset of the underlying stream cursor, reconciled lazily via {@link
* #lazySeek()}.
*/
@GuardedBy("lock")
private long position;
private long streamPos;

@GuardedBy("lock")
private volatile boolean closed;
Expand All @@ -87,7 +98,8 @@ public NativeS3InputStream(
this.key = key;
this.contentLength = contentLength;
this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE);
this.position = 0;
this.nextReadPos = 0;
this.streamPos = 0;
this.closed = false;

LOG.debug(
Expand All @@ -98,39 +110,60 @@ public NativeS3InputStream(
this.readBufferSize / 1024);
}

/** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading bytes. */
@GuardedBy("lock")
private void lazyInitialize() throws IOException {
assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock to be held";
if (currentStream == null && !closed) {
openStreamAtCurrentPosition();
}
}
private void lazySeek() throws IOException {
assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be held";
long targetPos = nextReadPos;

/** At EOF, release instead of reopening: {@code bytes=contentLength-} returns S3 416. */
@GuardedBy("lock")
private void repositionOpenStream() throws IOException {
assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires lock to be held";
if (currentStream == null) {
streamPos = targetPos;
return;
}
if (position >= contentLength) {

if (targetPos == streamPos) {
return;
}

long diff = targetPos - streamPos;
streamPos = targetPos;

if (targetPos >= contentLength) {
releaseStreams();
} else {
return;
}

if (diff > 0 && diff <= (long) readBufferSize) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put comment here explaining that we can't really find out the data size sitting inside the buffer without heavy or shaky development effort(extending JDK class) so the tradeoff is to accept max buffer size amount of skips during seeks.

As a side note which we don't need to mention as comment. If that's not performing well we can add a new config which controls the seek behavior, like s3.max-forward-seek-bytes or something similar.

skipBytesInBuffer(diff);
return;
}

openStreamAtCurrentPosition();
}

@GuardedBy("lock")
private void ensureStreamOpen() throws IOException {
assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires lock to be held";
if (currentStream == null && !closed) {
openStreamAtCurrentPosition();
}
}

/**
* Opens (or reopens) the S3 stream at the current position.
*
* <p>This method:
*
* <ul>
* <li>Closes any existing stream
* <li>Opens a new stream starting at {@link #position}
* <li>Uses HTTP range requests for non-zero positions
* </ul>
*/
@GuardedBy("lock")
private void skipBytesInBuffer(long n) throws IOException {
assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires lock to be held";
long remaining = n;
while (remaining > 0) {
long skipped = bufferedStream.skip(remaining);
if (skipped <= 0) {
openStreamAtCurrentPosition();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipBytesInBuffer has an untested recovery path: if bufferedStream.skip() returns <= 0 mid-skip (underlying HTTP connection stalled or closed), it falls back to openStreamAtCurrentPosition(). This is correct - streamPos is already set to targetPos before skipBytesInBuffer is called, so the fresh range request lands at the right position - but it is never exercised by the current test suite.

A targeted test would extend TrackingInputStream to return 0 from skip() on the first call, then delegate normally. The setup: do a small read (fills buffer), seek forward within readBufferSize, trigger the skip failure, then assert the correct byte is returned and getObjectCalls() == 2 (new range request was issued at the seek target).

return;
}
remaining -= skipped;
}
}

/** Opens (or reopens) the S3 stream at {@link #streamPos}. */
private void openStreamAtCurrentPosition() throws IOException {
lock.lock();
try {
Expand All @@ -140,11 +173,11 @@ private void openStreamAtCurrentPosition() throws IOException {
GetObjectRequest.Builder requestBuilder =
GetObjectRequest.builder().bucket(bucketName).key(key);

if (position > 0) {
requestBuilder.range(String.format("bytes=%d-", position));
if (streamPos > 0) {
requestBuilder.range(String.format("bytes=%d-", streamPos));
LOG.debug(
"Opening S3 stream with range: bytes={}-{}",
position,
streamPos,
contentLength - 1);
} else {
LOG.debug("Opening S3 stream for full object: {} bytes", contentLength);
Expand All @@ -160,12 +193,7 @@ private void openStreamAtCurrentPosition() throws IOException {
}
}

/**
* Aborts the in-flight HTTP connection so that subsequent {@code close()} calls on the stream
* do not drain remaining bytes over the network.
*
* @see ResponseInputStream#abort()
*/
/** Aborts the in-flight HTTP connection to avoid draining remaining bytes on close. */
@GuardedBy("lock")
private void abortCurrentStream() {
assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires lock to be held";
Expand All @@ -179,11 +207,10 @@ private void abortCurrentStream() {
}

/**
* Aborts and closes both streams, nulling the references. The abort is called first to prevent
* {@link ResponseInputStream#close()} from draining remaining bytes over the network.
* Aborts and closes both streams, nulling the references.
*
* @return the first {@link IOException} encountered (with subsequent ones added as suppressed),
* or {@code null} if cleanup succeeded without errors
* or {@code null} if cleanup succeeded
*/
@GuardedBy("lock")
private IOException releaseStreams() {
Expand Down Expand Up @@ -236,10 +263,7 @@ public void seek(long desired) throws IOException {
+ contentLength);
}

if (desired != position) {
position = desired;
repositionOpenStream();
}
nextReadPos = desired;
} finally {
lock.unlock();
}
Expand All @@ -249,7 +273,7 @@ public void seek(long desired) throws IOException {
public long getPos() throws IOException {
lock();
try {
return position;
return nextReadPos;
} finally {
lock.unlock();
}
Expand All @@ -262,13 +286,15 @@ public int read() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
if (position >= contentLength) {
if (nextReadPos >= contentLength) {
return -1;
}
lazyInitialize();
lazySeek();
ensureStreamOpen();
int data = bufferedStream.read();
if (data != -1) {
position++;
nextReadPos++;
streamPos++;
}
return data;
} finally {
Expand All @@ -295,15 +321,17 @@ public int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
if (position >= contentLength) {
if (nextReadPos >= contentLength) {
return -1;
}
lazyInitialize();
long remaining = contentLength - position;
lazySeek();
ensureStreamOpen();
long remaining = contentLength - nextReadPos;
int toRead = (int) Math.min(len, remaining);
int bytesRead = bufferedStream.read(b, off, toRead);
if (bytesRead > 0) {
position += bytesRead;
nextReadPos += bytesRead;
streamPos += bytesRead;
}
return bytesRead;
} finally {
Expand Down Expand Up @@ -336,7 +364,7 @@ public void close() throws IOException {
"Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}",
bucketName,
key,
position,
nextReadPos,
contentLength);
if (exception != null) {
throw exception;
Expand All @@ -346,24 +374,14 @@ public void close() throws IOException {
}
}

/**
* Returns an estimate of the number of bytes that can be read without blocking.
*
* <p>This implementation returns the remaining bytes in the object based on content length and
* current position. Note that actual reads may still block due to network I/O, but this
* indicates how much data is logically available.
*
* @return the number of remaining bytes (capped at Integer.MAX_VALUE)
* @throws IOException if the stream has been closed
*/
@Override
public int available() throws IOException {
lock();
try {
if (closed) {
throw new IOException("Stream is closed");
}
long remaining = contentLength - position;
long remaining = contentLength - nextReadPos;
return (int) Math.min(remaining, Integer.MAX_VALUE);
} finally {
lock.unlock();
Expand All @@ -380,12 +398,9 @@ public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long newPos = Math.min(position + n, contentLength);
long skipped = newPos - position;
if (newPos != position) {
position = newPos;
repositionOpenStream();
}
long newPos = Math.min(nextReadPos + n, contentLength);
long skipped = newPos - nextReadPos;
nextReadPos = newPos;
return skipped;
} finally {
lock.unlock();
Expand Down
Loading