Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
dd7ca7f
[FLINK-39524][checkpoint] Add non-blocking requestBuffer() and remove…
1996fanrui Apr 22, 2026
0f990e3
[FLINK-39520][checkpoint] Add spill file I/O and RecoveredBufferStore
1996fanrui Apr 28, 2026
6205d10
[FLINK-39521][checkpoint] Add OutputWriter to dispatch filtered recor…
1996fanrui Apr 28, 2026
5ec0cec
[FLINK-39522][network] Consume recovered buffers from RecoveredBuffer…
1996fanrui Apr 28, 2026
1a51a47
[FLINK-39523][checkpoint] Support streaming InputStream-based addInpu…
1996fanrui Apr 28, 2026
0b040ee
[FLINK-39524][checkpoint] Wire OutputWriter into the filterAndRewrite…
1996fanrui Apr 28, 2026
1b4d9db
[FLINK-39519][checkpoint] Simplify comments in the unaligned-recovery…
1996fanrui Apr 28, 2026
3fcf185
[FLINK-39519][checkpoint] Apply pending spotless formatting
1996fanrui Apr 28, 2026
3e7de29
[FLINK-39519][checkpoint] Rename Reader sealed/seal to frozen/freeze
1996fanrui Apr 28, 2026
99a9016
[FLINK-39519][checkpoint] Release recovered store on abort path
1996fanrui Apr 28, 2026
f6a17f7
[FLINK-39519][checkpoint] Signal markDrainDone when no dispatcher is …
1996fanrui Apr 28, 2026
d1e1ffc
[FLINK-39519][network] Remove dispatcher monitor and restore wide cha…
1996fanrui Apr 29, 2026
6d16324
[FLINK-39519][network] Rewrite dispatcher with explicit lock ordering
1996fanrui Apr 29, 2026
81ba1a2
[FLINK-39519][network] Close stale-enqueue race via gate -> store loc…
1996fanrui May 3, 2026
1f29b56
[FLINK-39519][network] Update tests for new gate -> store store API
1996fanrui May 3, 2026
cac93af
fix checkstyle
1996fanrui May 4, 2026
34ef500
Fix test failures
1996fanrui May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.buffer.Buffer;

import javax.annotation.Nullable;

import java.io.IOException;

/** Supplies per-channel network buffers to the recovery pipeline. */
@Internal
interface BufferRequester {

/** Non-blocking; returns {@code null} when no buffer is currently available. */
@Nullable
Buffer requestBuffer(InputChannelInfo channelInfo) throws IOException;

Buffer requestBufferBlocking(InputChannelInfo channelInfo)
throws InterruptedException, IOException;

/**
* Releases exclusive buffers for every channel served by this requester. Idempotent. Must run
* after the dispatcher's drain has finished so the underlying pools are no longer being read
* from.
*/
void releaseExclusiveBuffers() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;

Expand Down Expand Up @@ -180,6 +182,42 @@ void writeOutput(
}
}

/**
* Writes spilled input-channel state chunks as [4-byte length prefix][data bytes], matching the
* buffer-based path. The iterator is closed when done.
*/
void writeInputFromSpill(
JobVertexID jobVertexID,
int subtaskIndex,
CloseableIterator<FilteredSpillFile.Chunk> chunks) {
if (isDone()) {
IOUtils.closeQuietly(chunks);
return;
}
ChannelStatePendingResult pendingResult =
getChannelStatePendingResult(jobVertexID, subtaskIndex);
runWithChecks(
() -> {
checkState(!pendingResult.isAllInputsReceived());
try {
while (chunks.hasNext()) {
FilteredSpillFile.Chunk chunk = chunks.next();
InputChannelInfo info = chunk.getChannelInfo();
long offset = checkpointStream.getPos();
dataStream.writeInt(chunk.getLength());
dataStream.write(chunk.getData(), 0, chunk.getLength());
long size = checkpointStream.getPos() - offset;
pendingResult
.getInputChannelOffsets()
.computeIfAbsent(info, unused -> new StateContentMetaInfo())
.withDataAdded(offset, size);
}
} finally {
chunks.close();
}
});
}

private <K> void write(
Map<K, StateContentMetaInfo> offsets,
K key,
Expand Down
Loading