-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39603] Optimize NativeS3InputStream via seeks and reduce IOPS #28112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,21 +35,21 @@ | |
| import java.util.concurrent.locks.ReentrantLock; | ||
|
|
||
| /** | ||
| * S3 input stream with configurable read-ahead buffer, range-based requests for seek operations, | ||
| * automatic stream reopening on errors, and lazy initialization to minimize memory footprint. | ||
| * S3 input stream with configurable read-ahead buffer, lazy seek, and automatic stream reopening. | ||
| * | ||
| * <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe concurrent access and | ||
| * resource cleanup. | ||
| * <p>{@link #seek(long)} only records the desired position without performing any I/O. All HTTP | ||
| * work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks | ||
| * between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are | ||
| * skipped in-buffer instead of reopening the HTTP connection. | ||
| */ | ||
| class NativeS3InputStream extends FSDataInputStream { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class); | ||
|
|
||
| /** Default read-ahead buffer size: 256KB. */ | ||
| /** Default read-ahead buffer size. */ | ||
| private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024; | ||
|
|
||
| /** Maximum buffer size for very large sequential reads. */ | ||
| private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB | ||
| private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; | ||
|
|
||
| private final ReentrantLock lock = new ReentrantLock(); | ||
|
|
||
|
|
@@ -65,8 +65,19 @@ class NativeS3InputStream extends FSDataInputStream { | |
| @GuardedBy("lock") | ||
| private BufferedInputStream bufferedStream; | ||
|
|
||
| /** | ||
| * The position the caller expects to read from next. Updated by {@link #seek(long)}, {@link | ||
| * #skip(long)}, and after every successful {@link #read()}. | ||
| */ | ||
| @GuardedBy("lock") | ||
| private long nextReadPos; | ||
|
|
||
| /** | ||
| * The actual byte offset of the underlying stream cursor, reconciled lazily via {@link | ||
| * #lazySeek()}. | ||
| */ | ||
| @GuardedBy("lock") | ||
| private long position; | ||
| private long streamPos; | ||
|
|
||
| @GuardedBy("lock") | ||
| private volatile boolean closed; | ||
|
|
@@ -87,7 +98,8 @@ public NativeS3InputStream( | |
| this.key = key; | ||
| this.contentLength = contentLength; | ||
| this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE); | ||
| this.position = 0; | ||
| this.nextReadPos = 0; | ||
| this.streamPos = 0; | ||
| this.closed = false; | ||
|
|
||
| LOG.debug( | ||
|
|
@@ -98,39 +110,60 @@ public NativeS3InputStream( | |
| this.readBufferSize / 1024); | ||
| } | ||
|
|
||
| /** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading bytes. */ | ||
| @GuardedBy("lock") | ||
| private void lazyInitialize() throws IOException { | ||
| assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock to be held"; | ||
| if (currentStream == null && !closed) { | ||
| openStreamAtCurrentPosition(); | ||
| } | ||
| } | ||
| private void lazySeek() throws IOException { | ||
| assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be held"; | ||
| long targetPos = nextReadPos; | ||
|
|
||
| /** At EOF, release instead of reopening: {@code bytes=contentLength-} returns S3 416. */ | ||
| @GuardedBy("lock") | ||
| private void repositionOpenStream() throws IOException { | ||
| assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires lock to be held"; | ||
| if (currentStream == null) { | ||
| streamPos = targetPos; | ||
| return; | ||
| } | ||
| if (position >= contentLength) { | ||
|
|
||
| if (targetPos == streamPos) { | ||
| return; | ||
| } | ||
|
|
||
| long diff = targetPos - streamPos; | ||
| streamPos = targetPos; | ||
|
|
||
| if (targetPos >= contentLength) { | ||
| releaseStreams(); | ||
| } else { | ||
| return; | ||
| } | ||
|
|
||
| if (diff > 0 && diff <= (long) readBufferSize) { | ||
| skipBytesInBuffer(diff); | ||
| return; | ||
| } | ||
|
|
||
| openStreamAtCurrentPosition(); | ||
| } | ||
|
|
||
| @GuardedBy("lock") | ||
| private void ensureStreamOpen() throws IOException { | ||
| assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires lock to be held"; | ||
| if (currentStream == null && !closed) { | ||
| openStreamAtCurrentPosition(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Opens (or reopens) the S3 stream at the current position. | ||
| * | ||
| * <p>This method: | ||
| * | ||
| * <ul> | ||
| * <li>Closes any existing stream | ||
| * <li>Opens a new stream starting at {@link #position} | ||
| * <li>Uses HTTP range requests for non-zero positions | ||
| * </ul> | ||
| */ | ||
| @GuardedBy("lock") | ||
| private void skipBytesInBuffer(long n) throws IOException { | ||
| assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires lock to be held"; | ||
| long remaining = n; | ||
| while (remaining > 0) { | ||
| long skipped = bufferedStream.skip(remaining); | ||
| if (skipped <= 0) { | ||
| openStreamAtCurrentPosition(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
A targeted test would extend |
||
| return; | ||
| } | ||
| remaining -= skipped; | ||
| } | ||
| } | ||
|
|
||
| /** Opens (or reopens) the S3 stream at {@link #streamPos}. */ | ||
| private void openStreamAtCurrentPosition() throws IOException { | ||
| lock.lock(); | ||
| try { | ||
|
|
@@ -140,11 +173,11 @@ private void openStreamAtCurrentPosition() throws IOException { | |
| GetObjectRequest.Builder requestBuilder = | ||
| GetObjectRequest.builder().bucket(bucketName).key(key); | ||
|
|
||
| if (position > 0) { | ||
| requestBuilder.range(String.format("bytes=%d-", position)); | ||
| if (streamPos > 0) { | ||
| requestBuilder.range(String.format("bytes=%d-", streamPos)); | ||
| LOG.debug( | ||
| "Opening S3 stream with range: bytes={}-{}", | ||
| position, | ||
| streamPos, | ||
| contentLength - 1); | ||
| } else { | ||
| LOG.debug("Opening S3 stream for full object: {} bytes", contentLength); | ||
|
|
@@ -160,12 +193,7 @@ private void openStreamAtCurrentPosition() throws IOException { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Aborts the in-flight HTTP connection so that subsequent {@code close()} calls on the stream | ||
| * do not drain remaining bytes over the network. | ||
| * | ||
| * @see ResponseInputStream#abort() | ||
| */ | ||
| /** Aborts the in-flight HTTP connection to avoid draining remaining bytes on close. */ | ||
| @GuardedBy("lock") | ||
| private void abortCurrentStream() { | ||
| assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires lock to be held"; | ||
|
|
@@ -179,11 +207,10 @@ private void abortCurrentStream() { | |
| } | ||
|
|
||
| /** | ||
| * Aborts and closes both streams, nulling the references. The abort is called first to prevent | ||
| * {@link ResponseInputStream#close()} from draining remaining bytes over the network. | ||
| * Aborts and closes both streams, nulling the references. | ||
| * | ||
| * @return the first {@link IOException} encountered (with subsequent ones added as suppressed), | ||
| * or {@code null} if cleanup succeeded without errors | ||
| * or {@code null} if cleanup succeeded | ||
| */ | ||
| @GuardedBy("lock") | ||
| private IOException releaseStreams() { | ||
|
|
@@ -236,10 +263,7 @@ public void seek(long desired) throws IOException { | |
| + contentLength); | ||
| } | ||
|
|
||
| if (desired != position) { | ||
| position = desired; | ||
| repositionOpenStream(); | ||
| } | ||
| nextReadPos = desired; | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
|
|
@@ -249,7 +273,7 @@ public void seek(long desired) throws IOException { | |
| public long getPos() throws IOException { | ||
| lock(); | ||
| try { | ||
| return position; | ||
| return nextReadPos; | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
|
|
@@ -262,13 +286,15 @@ public int read() throws IOException { | |
| if (closed) { | ||
| throw new IOException("Stream is closed"); | ||
| } | ||
| if (position >= contentLength) { | ||
| if (nextReadPos >= contentLength) { | ||
| return -1; | ||
| } | ||
| lazyInitialize(); | ||
| lazySeek(); | ||
| ensureStreamOpen(); | ||
| int data = bufferedStream.read(); | ||
| if (data != -1) { | ||
| position++; | ||
| nextReadPos++; | ||
| streamPos++; | ||
| } | ||
| return data; | ||
| } finally { | ||
|
|
@@ -295,15 +321,17 @@ public int read(byte[] b, int off, int len) throws IOException { | |
| if (closed) { | ||
| throw new IOException("Stream is closed"); | ||
| } | ||
| if (position >= contentLength) { | ||
| if (nextReadPos >= contentLength) { | ||
| return -1; | ||
| } | ||
| lazyInitialize(); | ||
| long remaining = contentLength - position; | ||
| lazySeek(); | ||
| ensureStreamOpen(); | ||
| long remaining = contentLength - nextReadPos; | ||
| int toRead = (int) Math.min(len, remaining); | ||
| int bytesRead = bufferedStream.read(b, off, toRead); | ||
| if (bytesRead > 0) { | ||
| position += bytesRead; | ||
| nextReadPos += bytesRead; | ||
| streamPos += bytesRead; | ||
| } | ||
| return bytesRead; | ||
| } finally { | ||
|
|
@@ -336,7 +364,7 @@ public void close() throws IOException { | |
| "Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}", | ||
| bucketName, | ||
| key, | ||
| position, | ||
| nextReadPos, | ||
| contentLength); | ||
| if (exception != null) { | ||
| throw exception; | ||
|
|
@@ -346,24 +374,14 @@ public void close() throws IOException { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns an estimate of the number of bytes that can be read without blocking. | ||
| * | ||
| * <p>This implementation returns the remaining bytes in the object based on content length and | ||
| * current position. Note that actual reads may still block due to network I/O, but this | ||
| * indicates how much data is logically available. | ||
| * | ||
| * @return the number of remaining bytes (capped at Integer.MAX_VALUE) | ||
| * @throws IOException if the stream has been closed | ||
| */ | ||
| @Override | ||
| public int available() throws IOException { | ||
| lock(); | ||
| try { | ||
| if (closed) { | ||
| throw new IOException("Stream is closed"); | ||
| } | ||
| long remaining = contentLength - position; | ||
| long remaining = contentLength - nextReadPos; | ||
| return (int) Math.min(remaining, Integer.MAX_VALUE); | ||
| } finally { | ||
| lock.unlock(); | ||
|
|
@@ -380,12 +398,9 @@ public long skip(long n) throws IOException { | |
| if (n <= 0) { | ||
| return 0; | ||
| } | ||
| long newPos = Math.min(position + n, contentLength); | ||
| long skipped = newPos - position; | ||
| if (newPos != position) { | ||
| position = newPos; | ||
| repositionOpenStream(); | ||
| } | ||
| long newPos = Math.min(nextReadPos + n, contentLength); | ||
| long skipped = newPos - nextReadPos; | ||
| nextReadPos = newPos; | ||
| return skipped; | ||
| } finally { | ||
| lock.unlock(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put comment here explaining that we can't really find out the data size sitting inside the buffer without heavy or shaky development effort(extending JDK class) so the tradeoff is to accept max buffer size amount of skips during seeks.
As a side note which we don't need to mention as comment. If that's not performing well we can add a new config which controls the seek behavior, like
s3.max-forward-seek-bytesor something similar.