Skip to content

[FLINK-39603] Optimize NativeS3InputStream via seeks and reduce IOPS#28112

Open
Samrat002 wants to merge 1 commit intoapache:masterfrom
Samrat002:FLINK-39603
Open

[FLINK-39603] Optimize NativeS3InputStream via seeks and reduce IOPS#28112
Samrat002 wants to merge 1 commit intoapache:masterfrom
Samrat002:FLINK-39603

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

What is the purpose of the change

NativeS3InputStream.seek() unconditionally aborts the HTTP connection and opens a new Range GET for every seek, even for small forward seeks that land inside already-buffered data. During state restore, this creates O(N) HTTP round-trips for N state entries. each call costing TCP handshake + TLS + S3 response latency.

This PR aim to optimise an skip all HTTP work is deferred to the next read() call. Forward seeks within max(readBufferSize, bufferedStream.available()) skip in-buffer instead of reopening the connection.

Brief change log

  • seek() and skip() now only update nextReadPos without any I/O
  • Added lazySeek() to reconcile nextReadPos vs streamPos on the next read() call
  • Forward seeks within the dynamic threshold max(readBufferSize, bufferedStream.available()) skip in-buffer; backward or large forward seeks reopen via Range GET
  • Multiple seeks between reads coalesce — only the final position matters

Verifying this change

This change added tests and can be verified wit Unit Test and E2E working for high state flink application

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable
Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

@Samrat002
Copy link
Copy Markdown
Contributor Author

@gaborgsomogyi PTAL

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 4, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

I think we need an explicit config for buffer size. There are use-cases where we need to set 16MB buffer for proper IOPS usage.

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

Hi @Samrat002, great work reducing IOPS via lazy seek and range requests! While reviewing the implementation I found one remaining efficiency issue worth addressing.

What's working well

A seek that lands within the read-ahead buffer is served entirely from the buffer - no extra GetObject call, no bytes pulled from S3. The attached patch includes seekWithinBuffer_afterSmallRead_doesNotTouchUnderlyingStream which demonstrates this and passes.

Remaining issue: bulk reads bypass the BufferedInputStream buffer

BufferedInputStream.read(byte[], off, len) has an internal fast-path: when len >= bufferSize it reads directly from the underlying stream, skipping the local buffer entirely. After such a read the buffer is empty.

This means a forward seek that follows a bulk read cannot be satisfied from the buffer and falls through to bufferedStream.skip(), which consumes bytes from the live HTTP connection. No new GetObject is issued (IOPS are fine), but unnecessary bytes are downloaded and discarded.

The attached patch includes seekWithinBuffer_afterLargeRead_touchesUnderlyingStream which demonstrates the bug - it currently fails because bytesSkippedFromUnderlying is 10 instead of 0.

Suggested fix

Replace BufferedInputStream with a plain byte[] buffer managed directly, tracking bufferStart (file offset of the first byte in the buffer), bufferOffset (current read position within the array), and bufferLength (valid bytes). Then lazySeek() can check [bufferStart, bufferStart + bufferLength) to decide whether the seek is satisfiable in-buffer, independently of whether the preceding read was a byte-at-a-time or a bulk read.

The patch file adds the AtomicLong counters to TrackingInputStream and both test methods.

buffer-efficiency-tests.patch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants