Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,8 @@ private void setCurrentReader(int index) {
} catch (Exception e) {
throw new RuntimeException("Failed tp create reader", e);
}
reader.start();
currentSourceIndex = index;
currentReader = reader;
availabilityFuture.complete(null);
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
currentSourceIndex,
reader);
// add restored splits
if (!restoredSplits.isEmpty()) {
List<HybridSourceSplit> splits = new ArrayList<>(restoredSplits.size());
Expand All @@ -247,6 +240,14 @@ private void setCurrentReader(int index) {
}
addSplits(splits);
}

reader.start();
availabilityFuture.complete(null);
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
currentSourceIndex,
reader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.mock.Whitebox;

import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import java.util.Collections;
Expand Down Expand Up @@ -281,6 +282,55 @@ void testReaderRecovery() throws Exception {
reader.close();
}

@Test
void testReaderRecoveryInitializationOrder() throws Exception {
TestingReaderContext readerContext = new TestingReaderContext();
MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);

// First pass: create a snapshot with an in-progress split
HybridSourceReader<Integer> reader = new HybridSourceReader<>(readerContext);
reader.start();
assertAndClearSourceReaderFinishedEvent(readerContext, -1);
reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));

MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647);
SwitchedSources switchedSources = new SwitchedSources();
switchedSources.put(0, source);
HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources);
reader.addSplits(Collections.singletonList(hybridSplit));
List<HybridSourceSplit> snapshot = reader.snapshotState(0);
reader.close();

// Recovery: capture the underlying reader as a spy to verify call order
readerContext.clearSentEvents();
SourceReader<Integer, MockSourceSplit>[] spyHolder = new SourceReader[1];
Source spySource =
new MockSource(null, 0) {
@Override
public SourceReader<Integer, MockSourceSplit> createReader(
SourceReaderContext ctx) {
SourceReader<Integer, MockSourceSplit> spy =
Mockito.spy(source.createReader(ctx));
spyHolder[0] = spy;
return spy;
}
};

HybridSourceReader<Integer> recoveredReader = new HybridSourceReader<>(readerContext);
recoveredReader.addSplits(snapshot);
recoveredReader.start();
assertAndClearSourceReaderFinishedEvent(readerContext, -1);

recoveredReader.handleSourceEvents(new SwitchSourceEvent(0, spySource, false));

// Verify addSplits was called before start on the underlying reader
InOrder inOrder = Mockito.inOrder(spyHolder[0]);
inOrder.verify(spyHolder[0]).addSplits(Mockito.anyList());
inOrder.verify(spyHolder[0]).start();

recoveredReader.close();
}

@Test
void testDefaultMethodDelegation() throws Exception {
TestingReaderContext readerContext = new TestingReaderContext();
Expand Down