[FLINK-39603] Optimize NativeS3InputStream via seeks and reduce IOPS#28112
[FLINK-39603] Optimize NativeS3InputStream via seeks and reduce IOPS#28112Samrat002 wants to merge 1 commit intoapache:masterfrom
Conversation
|
@gaborgsomogyi PTAL |
|
I think we need an explicit config for buffer size. There are use-cases where we need to set 16MB buffer for proper IOPS usage. |
|
Hi @Samrat002, great work reducing IOPS via lazy seek and range requests! While reviewing the implementation I found one remaining efficiency issue worth addressing. What's working well A seek that lands within the read-ahead buffer is served entirely from the buffer - no extra Remaining issue: bulk reads bypass the
This means a forward seek that follows a bulk read cannot be satisfied from the buffer and falls through to The attached patch includes Suggested fix Replace The patch file adds the |
What is the purpose of the change
NativeS3InputStream.seek()unconditionally aborts the HTTP connection and opens a new Range GET for every seek, even for small forward seeks that land inside already-buffered data. During state restore, this createsO(N)HTTP round-trips for N state entries. each call costing TCP handshake + TLS + S3 response latency.This PR aim to optimise an skip all HTTP work is deferred to the next
read()call. Forward seeks withinmax(readBufferSize, bufferedStream.available())skip in-buffer instead of reopening the connection.Brief change log
seek()andskip()now only updatenextReadPoswithout any I/OlazySeek()to reconcilenextReadPosvsstreamPoson the nextread()callmax(readBufferSize, bufferedStream.available())skip in-buffer; backward or large forward seeks reopen via Range GETVerifying this change
This change added tests and can be verified wit Unit Test and E2E working for high state flink application
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?