3636import org .apache .iotdb .commons .pipe .datastructure .pattern .TablePattern ;
3737import org .apache .iotdb .commons .pipe .datastructure .pattern .TreePattern ;
3838import org .apache .iotdb .commons .pipe .datastructure .resource .PersistentResource ;
39+ import org .apache .iotdb .commons .pipe .event .EnrichedEvent ;
3940import org .apache .iotdb .commons .pipe .event .ProgressReportEvent ;
4041import org .apache .iotdb .commons .queryengine .utils .DateTimeUtils ;
4142import org .apache .iotdb .commons .utils .PathUtils ;
@@ -164,6 +165,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
164165 private Queue <PersistentResource > pendingQueue ;
165166 private final Map <TsFileResource , Set <String >> filteredTsFileResources2TableNames =
166167 new HashMap <>();
168+ private final Map <PersistentResource , Long > pendingResource2ReplicateIndexForIoTV2 =
169+ new HashMap <>();
167170
168171 @ Override
169172 public void validate (final PipeParameterValidator validator ) {
@@ -810,18 +813,27 @@ public synchronized Event supply() {
810813 return null ;
811814 }
812815
813- final PersistentResource resource = pendingQueue .poll ();
816+ final PersistentResource resource = pendingQueue .peek ();
814817 if (resource == null ) {
815818 return supplyTerminateEvent ();
816819 }
817820
818821 if (resource instanceof TsFileResource ) {
819822 final TsFileResource tsFileResource = (TsFileResource ) resource ;
820- return consumeSkippedHistoricalTsFileEventIfNecessary (tsFileResource )
821- ? supplyProgressReportEvent (tsFileResource .getMaxProgressIndex ())
822- : supplyTsFileEvent (tsFileResource );
823+ if (consumeSkippedHistoricalTsFileEventIfNecessary (tsFileResource )) {
824+ clearReplicateIndexForResource (tsFileResource );
825+ pendingQueue .poll ();
826+ return supplyProgressReportEvent (tsFileResource .getMaxProgressIndex ());
827+ }
828+
829+ final Event event = supplyTsFileEvent (tsFileResource );
830+ pendingQueue .poll ();
831+ return event ;
823832 }
824- return supplyDeletionEvent ((DeletionResource ) resource );
833+
834+ final Event event = supplyDeletionEvent ((DeletionResource ) resource );
835+ pendingQueue .poll ();
836+ return event ;
825837 }
826838
827839 private Event supplyTerminateEvent () {
@@ -891,50 +903,52 @@ protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) {
891903
892904 protected Event supplyTsFileEvent (final TsFileResource resource ) {
893905 if (!filteredTsFileResources2TableNames .containsKey (resource )) {
906+ clearReplicateIndexForResource (resource );
894907 return supplyProgressReportEvent (resource .getMaxProgressIndex ());
895908 }
896909
897- final PipeTsFileInsertionEvent event =
898- new PipeTsFileInsertionEvent (
899- isModelDetected ? isTableModel : null ,
900- resource .getDatabaseName (),
901- resource ,
902- null ,
903- shouldTransferModFile ,
904- false ,
905- true ,
906- filteredTsFileResources2TableNames .get (resource ),
910+ boolean shouldUnpinResource = false ;
911+ boolean shouldClearReplicateIndex = false ;
912+ try {
913+ final PipeTsFileInsertionEvent event =
914+ new PipeTsFileInsertionEvent (
915+ isModelDetected ? isTableModel : null ,
916+ resource .getDatabaseName (),
917+ resource ,
918+ null ,
919+ shouldTransferModFile ,
920+ false ,
921+ true ,
922+ filteredTsFileResources2TableNames .get (resource ),
923+ pipeName ,
924+ creationTime ,
925+ pipeTaskMeta ,
926+ treePattern ,
927+ tablePattern ,
928+ userId ,
929+ userName ,
930+ cliHostname ,
931+ skipIfNoPrivileges ,
932+ historicalDataExtractionStartTime ,
933+ historicalDataExtractionEndTime );
934+
935+ // if using IoTV2, assign a replicateIndex for this event
936+ if (shouldAssignReplicateIndexForIoTV2 (event )) {
937+ event .setReplicateIndexForIoTV2 (assignReplicateIndexForResource (resource ));
938+ LOGGER .debug (
939+ "[{}]Set {} for historical event {}" ,
907940 pipeName ,
908- creationTime ,
909- pipeTaskMeta ,
910- treePattern ,
911- tablePattern ,
912- userId ,
913- userName ,
914- cliHostname ,
915- skipIfNoPrivileges ,
916- historicalDataExtractionStartTime ,
917- historicalDataExtractionEndTime );
918-
919- filteredTsFileResources2TableNames .remove (resource );
920-
921- // if using IoTV2, assign a replicateIndex for this event
922- if (DataRegionConsensusImpl .getInstance () instanceof IoTConsensusV2
923- && IoTConsensusV2Processor .isShouldReplicate (event )) {
924- event .setReplicateIndexForIoTV2 (
925- ReplicateProgressDataNodeManager .assignReplicateIndexForIoTV2 (pipeName ));
926- LOGGER .debug (
927- "[{}]Set {} for historical event {}" , pipeName , event .getReplicateIndexForIoTV2 (), event );
928- }
941+ event .getReplicateIndexForIoTV2 (),
942+ event );
943+ }
929944
930- if (sloppyPattern || isDbNameCoveredByPattern ) {
931- event .skipParsingPattern ();
932- }
933- if (sloppyTimeRange || isTsFileResourceCoveredByTimeRange (resource )) {
934- event .skipParsingTime ();
935- }
945+ if (sloppyPattern || isDbNameCoveredByPattern ) {
946+ event .skipParsingPattern ();
947+ }
948+ if (sloppyTimeRange || isTsFileResourceCoveredByTimeRange (resource )) {
949+ event .skipParsingTime ();
950+ }
936951
937- try {
938952 final boolean isReferenceCountIncreased =
939953 event .increaseReferenceCount (
940954 PipeHistoricalDataRegionTsFileAndDeletionSource .class .getName ());
@@ -945,17 +959,25 @@ protected Event supplyTsFileEvent(final TsFileResource resource) {
945959 dataRegionId ,
946960 event );
947961 }
962+ filteredTsFileResources2TableNames .remove (resource );
963+ shouldUnpinResource = true ;
964+ shouldClearReplicateIndex = true ;
948965 return isReferenceCountIncreased ? event : null ;
949966 } finally {
950- try {
951- PipeDataNodeResourceManager .tsfile ()
952- .unpinTsFileResource (resource , shouldTransferModFile , pipeName );
953- } catch (final IOException e ) {
954- LOGGER .warn (
955- "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}" ,
956- pipeName ,
957- dataRegionId ,
958- resource .getTsFilePath ());
967+ if (shouldClearReplicateIndex ) {
968+ clearReplicateIndexForResource (resource );
969+ }
970+ if (shouldUnpinResource ) {
971+ try {
972+ PipeDataNodeResourceManager .tsfile ()
973+ .unpinTsFileResource (resource , shouldTransferModFile , pipeName );
974+ } catch (final IOException e ) {
975+ LOGGER .warn (
976+ "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}" ,
977+ pipeName ,
978+ dataRegionId ,
979+ resource .getTsFilePath ());
980+ }
959981 }
960982 }
961983 }
@@ -983,10 +1005,8 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) {
9831005 false );
9841006
9851007 // if using IoTV2, assign a replicateIndex for this historical deletion event
986- if (DataRegionConsensusImpl .getInstance () instanceof IoTConsensusV2
987- && IoTConsensusV2Processor .isShouldReplicate (event )) {
988- event .setReplicateIndexForIoTV2 (
989- ReplicateProgressDataNodeManager .assignReplicateIndexForIoTV2 (pipeName ));
1008+ if (shouldAssignReplicateIndexForIoTV2 (event )) {
1009+ event .setReplicateIndexForIoTV2 (assignReplicateIndexForResource (deletionResource ));
9901010 LOGGER .debug (
9911011 "[{}]Set {} for historical deletion event {}" ,
9921012 pipeName ,
@@ -1017,9 +1037,25 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) {
10171037 event .setDeletionResource (
10181038 manager .getDeletionResource (event .getDeleteDataNode ())));
10191039 }
1040+ clearReplicateIndexForResource (deletionResource );
10201041 return isReferenceCountIncreased ? event : null ;
10211042 }
10221043
1044+ protected boolean shouldAssignReplicateIndexForIoTV2 (final EnrichedEvent event ) {
1045+ return DataRegionConsensusImpl .getInstance () instanceof IoTConsensusV2
1046+ && IoTConsensusV2Processor .isShouldReplicate (event );
1047+ }
1048+
1049+ protected long assignReplicateIndexForResource (final PersistentResource resource ) {
1050+ return pendingResource2ReplicateIndexForIoTV2 .computeIfAbsent (
1051+ resource ,
1052+ ignored -> ReplicateProgressDataNodeManager .assignReplicateIndexForIoTV2 (pipeName ));
1053+ }
1054+
1055+ protected void clearReplicateIndexForResource (final PersistentResource resource ) {
1056+ pendingResource2ReplicateIndexForIoTV2 .remove (resource );
1057+ }
1058+
10231059 @ Override
10241060 public synchronized boolean hasConsumedAll () {
10251061 // If the pendingQueue is null when the function is called, it implies that the extractor only
@@ -1055,5 +1091,6 @@ public synchronized void close() {
10551091 pendingQueue .clear ();
10561092 pendingQueue = null ;
10571093 }
1094+ pendingResource2ReplicateIndexForIoTV2 .clear ();
10581095 }
10591096}
0 commit comments