diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java index e7eff9616..356fe8f60 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -18,40 +18,41 @@ package org.apache.xtable.parquet; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.Paths; import java.time.Instant; -import java.util.*; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Builder; import lombok.NonNull; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; -import org.apache.xtable.exception.ReadException; -import org.apache.xtable.hudi.*; import org.apache.xtable.hudi.HudiPathUtils; -import org.apache.xtable.model.*; +import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.*; -import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.extractor.ConversionSource; -@Builder +@Log4j2 public class ParquetConversionSource implements ConversionSource { private static final ParquetSchemaExtractor schemaExtractor = @@ -68,10 +69,25 @@ public class ParquetConversionSource implements ConversionSource { private final String tableName; private final String basePath; @NonNull private final Configuration hadoopConf; + private final ParquetDataManager parquetDataManager; + + @Builder + ParquetConversionSource( + String tableName, + String basePath, + @NonNull Configuration hadoopConf, + ParquetPartitionValueExtractor partitionValueExtractor, + PathBasedPartitionSpecExtractor partitionSpecExtractor) { + this.tableName = tableName; + this.basePath = basePath; + this.hadoopConf = hadoopConf; + this.partitionValueExtractor = partitionValueExtractor; + this.partitionSpecExtractor = partitionSpecExtractor; + this.parquetDataManager = new ParquetDataManager(hadoopConf, basePath); + } - private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) { - ParquetMetadata parquetMetadata = - parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath()); + private InternalTable createInternalTableFromFile(ParquetFileInfo latestFile) { + ParquetMetadata parquetMetadata = latestFile.getMetadata(); MessageType parquetSchema = parquetMetadataExtractor.getSchema(parquetMetadata); InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, ""); List partitionFields = partitionSpecExtractor.spec(schema); @@ -94,49 +110,26 @@ private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) @Override public InternalTable getTable(Long modificationTime) { // get parquetFile at specific time modificationTime - Stream parquetFiles = getParquetFiles(hadoopConf, basePath); - LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime); + ParquetFileInfo file = parquetDataManager.getParquetDataFileAt(modificationTime); return createInternalTableFromFile(file); } - private Stream getInternalDataFiles(Stream parquetFiles) { - return parquetFiles.map( - file -> - InternalDataFile.builder() - .physicalPath(file.getPath().toString()) - .fileFormat(FileFormat.APACHE_PARQUET) - .fileSizeBytes(file.getLen()) - .partitionValues( - partitionValueExtractor.extractPartitionValues( - partitionSpecExtractor.spec( - partitionValueExtractor.extractSchemaForParquetPartitions( - parquetMetadataExtractor.readParquetMetadata( - hadoopConf, file.getPath()), - file.getPath().toString())), - HudiPathUtils.getPartitionPath(new Path(basePath), file.getPath()))) - .lastModified(file.getModificationTime()) - .columnStats( - parquetStatsExtractor.getColumnStatsForaFile( - parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()))) - .build()); + private Stream getInternalDataFiles( + Stream parquetFiles, InternalSchema schema) { + return parquetFiles.map(file -> createInternalDataFileFromParquetFile(file, schema)); } - private InternalDataFile createInternalDataFileFromParquetFile(FileStatus parquetFile) { + private InternalDataFile createInternalDataFileFromParquetFile( + ParquetFileInfo parquetFile, InternalSchema schema) { return InternalDataFile.builder() .physicalPath(parquetFile.getPath().toString()) .partitionValues( partitionValueExtractor.extractPartitionValues( - partitionSpecExtractor.spec( - partitionValueExtractor.extractSchemaForParquetPartitions( - parquetMetadataExtractor.readParquetMetadata( - hadoopConf, parquetFile.getPath()), - parquetFile.getPath().toString())), - basePath)) + partitionSpecExtractor.spec(schema), + HudiPathUtils.getPartitionPath(new Path(basePath), parquetFile.getPath()))) .lastModified(parquetFile.getModificationTime()) - .fileSizeBytes(parquetFile.getLen()) - .columnStats( - parquetStatsExtractor.getColumnStatsForaFile( - parquetMetadataExtractor.readParquetMetadata(hadoopConf, parquetFile.getPath()))) + .fileSizeBytes(parquetFile.getSize()) + .columnStats(parquetStatsExtractor.getColumnStatsForaFile(parquetFile.getMetadata())) .build(); } @@ -149,87 +142,84 @@ public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync syncIns @Override public TableChange getTableChangeForCommit(Long modificationTime) { - Stream parquetFiles = getParquetFiles(hadoopConf, basePath); - Set addedInternalDataFiles = new HashSet<>(); - List tableChangesAfter = - parquetFiles - .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) - .collect(Collectors.toList()); - InternalTable internalTable = getMostRecentTable(parquetFiles); - for (FileStatus tableStatus : tableChangesAfter) { - InternalDataFile currentDataFile = createInternalDataFileFromParquetFile(tableStatus); - addedInternalDataFiles.add(currentDataFile); - } + List tableChangesAfterModificationTime = + parquetDataManager.getParquetFilesMetadataAfterTime(modificationTime); + InternalTable internalTable = + getMostRecentTableConfig(tableChangesAfterModificationTime.stream()); + Set addedInternalDataFiles = + tableChangesAfterModificationTime.stream() + .map(file -> createInternalDataFileFromParquetFile(file, internalTable.getReadSchema())) + .collect(Collectors.toSet()); return TableChange.builder() + .sourceIdentifier( + getCommitIdentifier( + parquetDataManager.getMostRecentParquetFile().getModificationTime())) .tableAsOfChange(internalTable) .filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build()) .build(); } - private InternalTable getMostRecentTable(Stream parquetFiles) { - LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles); + private InternalTable getMostRecentTable() { + ParquetFileInfo latestFile = parquetDataManager.getMostRecentParquetFile(); + return createInternalTableFromFile(latestFile); + } + + private InternalTable getMostRecentTableConfig(Stream parquetFiles) { + ParquetFileInfo latestFile = + parquetFiles + .max(Comparator.comparing(ParquetFileInfo::getModificationTime)) + .orElseThrow(() -> new IllegalStateException("No files found")); return createInternalTableFromFile(latestFile); } @Override public InternalTable getCurrentTable() { - Stream parquetFiles = getParquetFiles(hadoopConf, basePath); - return getMostRecentTable(parquetFiles); + return getMostRecentTable(); } - /** - * get current snapshot - * - * @return - */ @Override public InternalSnapshot getCurrentSnapshot() { - // to avoid consume the stream call the method twice to return the same stream of parquet files + InternalTable table = getMostRecentTable(); Stream internalDataFiles = - getInternalDataFiles(getParquetFiles(hadoopConf, basePath)); - InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, basePath)); + getInternalDataFiles(parquetDataManager.getCurrentFileInfo(), table.getReadSchema()); return InternalSnapshot.builder() .table(table) .sourceIdentifier( getCommitIdentifier( - getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath)) - .getModificationTime())) + parquetDataManager.getMostRecentParquetFile().getModificationTime())) .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) .build(); } - private LocatedFileStatus getMostRecentParquetFile(Stream parquetFiles) { - return parquetFiles - .max(Comparator.comparing(FileStatus::getModificationTime)) - .orElseThrow(() -> new IllegalStateException("No files found")); - } + @Override + public boolean isIncrementalSyncSafeFrom(Instant timeInMillis) { + Stream parquetFilesMetadata = parquetDataManager.getCurrentFileInfo(); + OptionalLong earliestModTimeOpt = + parquetFilesMetadata.mapToLong(ParquetFileInfo::getModificationTime).min(); + + if (!earliestModTimeOpt.isPresent()) { + log.warn("No parquet files found in table {}. Incremental sync is not possible.", tableName); + return false; + } - private LocatedFileStatus getParquetFileAt( - Stream parquetFiles, long modificationTime) { - return parquetFiles - .filter(fileStatus -> fileStatus.getModificationTime() == modificationTime) - .findFirst() - .orElseThrow(() -> new IllegalStateException("No file found at " + modificationTime)); - } + long earliestModTime = earliestModTimeOpt.getAsLong(); - private Stream getParquetFiles(Configuration hadoopConf, String basePath) { - try { - FileSystem fs = FileSystem.get(hadoopConf); - URI uriBasePath = new URI(basePath); - String parentPath = Paths.get(uriBasePath).toString(); - RemoteIterator iterator = fs.listFiles(new Path(parentPath), true); - return RemoteIterators.toList(iterator).stream() - .filter(file -> file.getPath().getName().endsWith("parquet")); - } catch (IOException | URISyntaxException e) { - throw new ReadException("Unable to read files from file system", e); + if (earliestModTime > timeInMillis.toEpochMilli()) { + log.warn( + "Incremental sync is not safe. Earliest available metadata (time={}) is newer " + + "than requested instant {}.", + Instant.ofEpochMilli(earliestModTime), + timeInMillis.toEpochMilli()); + return false; } - } - @Override - public boolean isIncrementalSyncSafeFrom(Instant instant) { - return false; + log.debug( + "Incremental sync is safe from instant {} for table {}", + timeInMillis.toEpochMilli(), + tableName); + return true; } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java new file mode 100644 index 000000000..c1db5ee48 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.util.functional.RemoteIterators; + +import org.apache.xtable.exception.ReadException; + +/** + * Manages Parquet File's Metadata + * + *

This class provides functions to handle Parquet metadata, creating metadata objects from + * parquet files and filtering the files based on the modification times. + */ +@Log4j2 +@RequiredArgsConstructor +public class ParquetDataManager { + private final Configuration hadoopConf; + private final String basePath; + private final FileSystem fileSystem; + + public ParquetDataManager(Configuration hadoopConf, String basePath) { + this.hadoopConf = hadoopConf; + this.basePath = basePath; + try { + URI uri = Paths.get(basePath).toUri(); + this.fileSystem = FileSystem.get(uri, hadoopConf); + } catch (IOException e) { + throw new ReadException("Unable to initialize file system for base path: " + basePath, e); + } + } + + @Getter(value = AccessLevel.PRIVATE, lazy = true) + private final List parquetFiles = loadParquetFiles(); + + ParquetFileInfo getMostRecentParquetFile() { + LocatedFileStatus file = + getParquetFiles().stream() + .max(Comparator.comparing(LocatedFileStatus::getModificationTime)) + .orElseThrow(() -> new IllegalStateException("No files found")); + return new ParquetFileInfo(hadoopConf, file); + } + + ParquetFileInfo getParquetDataFileAt(long targetTime) { + return getParquetFiles().stream() + .filter(file -> file.getModificationTime() >= targetTime) + .min(Comparator.comparing(LocatedFileStatus::getModificationTime)) + .map(file -> new ParquetFileInfo(hadoopConf, file)) + .orElseThrow(() -> new IllegalStateException("No file found at or after " + targetTime)); + } + + private List loadParquetFiles() { + try { + RemoteIterator iterator = fileSystem.listFiles(new Path(basePath), true); + return RemoteIterators.toList(iterator).stream() + .filter(file -> file.getPath().getName().endsWith("parquet")) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new ReadException("Unable to read files from file system", e); + } + } + + Stream getCurrentFileInfo() { + return getParquetFiles().stream() + .map(fileStatus -> new ParquetFileInfo(hadoopConf, fileStatus)); + } + + List getParquetFilesMetadataAfterTime(long syncTime) { + return getParquetFiles().stream() + .filter(file -> file.getModificationTime() >= syncTime) + .map(file -> new ParquetFileInfo(hadoopConf, file)) + .collect(Collectors.toList()); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileInfo.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileInfo.java new file mode 100644 index 000000000..539349edb --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileInfo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import lombok.Getter; +import lombok.Value; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + +@Log4j2 +@Value +class ParquetFileInfo { + @Getter(lazy = true) + ParquetMetadata metadata = getMetadataInternal(); + + long modificationTime; + long size; + Path path; + Configuration conf; + + public ParquetFileInfo(Configuration conf, FileStatus file) { + this.modificationTime = file.getModificationTime(); + this.path = file.getPath(); + this.size = file.getLen(); + this.conf = conf; + } + + private ParquetMetadata getMetadataInternal() { + return ParquetMetadataExtractor.readParquetMetadata(conf, path); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java index ab4ceaf48..26ac67ff0 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java @@ -131,8 +131,8 @@ public static ParquetPartitionValueExtractor getInstance() { return INSTANCE; } - public InternalSchema extractSchemaForParquetPartitions(ParquetMetadata footer, String path) { + public InternalSchema extractSchemaForParquetPartitions(ParquetMetadata footer) { MessageType parquetSchema = parquetMetadataExtractor.getSchema(footer); - return schemaExtractor.toInternalSchema(parquetSchema, path); + return schemaExtractor.toInternalSchema(parquetSchema, ""); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java index a1393b980..547d86ff6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java @@ -140,8 +140,7 @@ public static InternalDataFile toInternalDataFile(Configuration hadoopConf, Path partitionValueExtractor.extractPartitionValues( partitionSpecExtractor.spec( partitionValueExtractor.extractSchemaForParquetPartitions( - parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()), - file.getPath().toString())), + parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()))), parentPath.toString()); return InternalDataFile.builder() .physicalPath(parentPath.toString()) diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java index 6dda9db11..218ba4d75 100644 --- a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java @@ -18,16 +18,20 @@ package org.apache.xtable.parquet; +import static org.apache.spark.sql.functions.expr; import static org.apache.xtable.GenericTable.getTableName; import static org.apache.xtable.model.storage.TableFormat.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; -import java.net.URISyntaxException; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Timestamp; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.Arrays; import java.util.Collections; @@ -43,6 +47,10 @@ import lombok.Builder; import lombok.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; @@ -52,6 +60,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -67,6 +76,10 @@ import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiTestUtil; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; public class ITParquetConversionSource { @@ -84,7 +97,7 @@ public static void setupOnce() { sparkConf.set("parquet.avro.write-old-list-structure", "false"); sparkConf.set("spark.sql.parquet.writeLegacyFormat", "false"); sparkConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS"); - + sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); } @@ -100,27 +113,19 @@ public static void teardown() { sparkSession = null; } } - // delimiter must be / and not - or any other one - private static Stream provideArgsForFilePartitionTesting() { - String partitionConfig = // "timestamp:YEAR:year=yyyy"; - "timestamp:MONTH:year=yyyy/month=MM"; // or "timestamp:YEAR:year=yyyy", or // - // timestamp:DAY:year=yyyy/month=MM/day=dd - return Stream.of( - Arguments.of( - buildArgsForPartition( - PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig, partitionConfig))); - } private static TableFormatPartitionDataHolder buildArgsForPartition( String sourceFormat, List targetFormats, String hudiPartitionConfig, - String xTablePartitionConfig) { + String xTablePartitionConfig, + SyncMode syncMode) { return TableFormatPartitionDataHolder.builder() .sourceTableFormat(sourceFormat) .targetTableFormats(targetFormats) .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig)) .xTablePartitionConfig(xTablePartitionConfig) + .syncMode(syncMode) .build(); } @@ -136,11 +141,12 @@ private static ConversionConfig getTableSyncConfig( if (partitionConfig != null) { sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); } + String absolutePath = new java.io.File(java.net.URI.create(table.getDataPath())).getPath(); SourceTable sourceTable = SourceTable.builder() .name(tableName) .formatName(sourceTableFormat) - .basePath(table.getBasePath()) + .basePath(absolutePath) .dataPath(table.getDataPath()) .additionalProperties(sourceProperties) .build(); @@ -153,7 +159,7 @@ private static ConversionConfig getTableSyncConfig( .name(tableName) .formatName(formatName) // set the metadata path to the data path as the default (required by Hudi) - .basePath(table.getDataPath()) + .basePath(absolutePath) .metadataRetention(metadataRetention) .build()) .collect(Collectors.toList()); @@ -165,12 +171,21 @@ private static ConversionConfig getTableSyncConfig( .build(); } - private static Stream provideArgsForFileNonPartitionTesting() { - String partitionConfig = null; - return Stream.of( - Arguments.of( - buildArgsForPartition( - PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig, partitionConfig))); + private static Stream provideArgsForSyncTesting() { + List partitionConfigs = Arrays.asList(null, "timestamp:MONTH:year=yyyy/month=MM"); + return partitionConfigs.stream() + .flatMap( + partitionConfig -> + Arrays.stream(SyncMode.values()) + .map( + syncMode -> + Arguments.of( + buildArgsForPartition( + PARQUET, + Arrays.asList(ICEBERG, DELTA, HUDI), + partitionConfig, + partitionConfig, + syncMode)))); } private ConversionSourceProvider getConversionSourceProvider(String sourceTableFormat) { @@ -185,9 +200,8 @@ private ConversionSourceProvider getConversionSourceProvider(String sourceTab } @ParameterizedTest - @MethodSource("provideArgsForFileNonPartitionTesting") - public void testFileNonPartitionedData( - TableFormatPartitionDataHolder tableFormatPartitionDataHolder) throws URISyntaxException { + @MethodSource("provideArgsForSyncTesting") + void testSync(TableFormatPartitionDataHolder tableFormatPartitionDataHolder) { String tableName = getTableName(); String sourceTableFormat = tableFormatPartitionDataHolder.getSourceTableFormat(); List targetTableFormats = tableFormatPartitionDataHolder.getTargetTableFormats(); @@ -221,17 +235,24 @@ public void testFileNonPartitionedData( new MetadataBuilder().putString("precision", "millis").build()) }); Dataset df = sparkSession.createDataFrame(data, schema); - String dataPath = tempDir.toAbsolutePath().toString() + "/non_partitioned_data"; - df.write().mode(SaveMode.Overwrite).parquet(dataPath); - GenericTable table; - table = + String dataPath = + tempDir + .resolve( + (xTablePartitionConfig == null ? "non_partitioned_data_" : "partitioned_data_") + + tableFormatPartitionDataHolder.getSyncMode()) + .toString(); + + writeData(df, dataPath, xTablePartitionConfig); + boolean isPartitioned = xTablePartitionConfig != null; + + Path pathForXTable = Paths.get(dataPath); + try (GenericTable table = GenericTable.getInstance( - tableName, Paths.get(dataPath), sparkSession, jsc, sourceTableFormat, false); - try (GenericTable tableToClose = table) { + tableName, pathForXTable, sparkSession, jsc, sourceTableFormat, isPartitioned)) { ConversionConfig conversionConfig = getTableSyncConfig( sourceTableFormat, - SyncMode.FULL, + tableFormatPartitionDataHolder.getSyncMode(), tableName, table, targetTableFormats, @@ -240,77 +261,12 @@ public void testFileNonPartitionedData( ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); conversionController.sync(conversionConfig, conversionSourceProvider); - checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose, targetTableFormats, false); - } - } - - @ParameterizedTest - @MethodSource("provideArgsForFilePartitionTesting") - public void testFilePartitionedData(TableFormatPartitionDataHolder tableFormatPartitionDataHolder) - throws URISyntaxException { - String tableName = getTableName(); - String sourceTableFormat = tableFormatPartitionDataHolder.getSourceTableFormat(); - List targetTableFormats = tableFormatPartitionDataHolder.getTargetTableFormats(); - String xTablePartitionConfig = tableFormatPartitionDataHolder.getXTablePartitionConfig(); - ConversionSourceProvider conversionSourceProvider = - getConversionSourceProvider(sourceTableFormat); - // create the data - List data = - Arrays.asList( - RowFactory.create(1, "Alice", true, 30.1, new Timestamp(System.currentTimeMillis())), - RowFactory.create( - 2, "Bob", false, 24.6, new Timestamp(System.currentTimeMillis() + 1000)), - RowFactory.create( - 3, "Charlie", true, 35.2, new Timestamp(System.currentTimeMillis() + 2000)), - RowFactory.create( - 4, "David", false, 29.5, new Timestamp(System.currentTimeMillis() + 3000)), - RowFactory.create( - 5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis() + 4000))); + checkDatasetEquivalenceWithFilter( + sourceTableFormat, table, targetTableFormats, isPartitioned); - schema = - DataTypes.createStructType( - new StructField[] { - DataTypes.createStructField("id", DataTypes.IntegerType, false), - DataTypes.createStructField("name", DataTypes.StringType, false), - DataTypes.createStructField("hasSiblings", DataTypes.BooleanType, false), - DataTypes.createStructField("age", DataTypes.DoubleType, false), - DataTypes.createStructField( - "timestamp", - DataTypes.TimestampType, - false, - new MetadataBuilder().putString("precision", "millis").build()) - }); - Dataset df = sparkSession.createDataFrame(data, schema); - String dataPathPart = tempDir.toAbsolutePath() + "/partitioned_data"; - df.withColumn("year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))) - .withColumn( - "month", - functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType), "MM")) - .write() - .mode(SaveMode.Overwrite) - .partitionBy("year", "month") - .parquet(dataPathPart); - GenericTable table; - table = - GenericTable.getInstance( - tableName, Paths.get(dataPathPart), sparkSession, jsc, sourceTableFormat, true); - try (GenericTable tableToClose = table) { - ConversionConfig conversionConfig = - getTableSyncConfig( - sourceTableFormat, - SyncMode.FULL, - tableName, - table, - targetTableFormats, - xTablePartitionConfig, - null); - ConversionController conversionController = - new ConversionController(jsc.hadoopConfiguration()); - conversionController.sync(conversionConfig, conversionSourceProvider); - checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose, targetTableFormats, true); // update the current parquet file data with another attribute the sync again List dataToAppend = - Arrays.asList( + Collections.singletonList( RowFactory.create( 10, "BobAppended", @@ -319,33 +275,171 @@ public void testFilePartitionedData(TableFormatPartitionDataHolder tableFormatPa new Timestamp(System.currentTimeMillis() + 1500))); Dataset dfAppend = sparkSession.createDataFrame(dataToAppend, schema); - dfAppend - .withColumn( - "year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))) - .withColumn( - "month", - functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType), "MM")) - .write() - .mode(SaveMode.Append) - .partitionBy("year", "month") - .parquet(dataPathPart); - GenericTable tableAppend; - tableAppend = - GenericTable.getInstance( - tableName, Paths.get(dataPathPart), sparkSession, jsc, sourceTableFormat, true); - try (GenericTable tableToCloseAppended = tableAppend) { - ConversionConfig conversionConfigAppended = - getTableSyncConfig( - sourceTableFormat, - SyncMode.FULL, - tableName, - tableAppend, - targetTableFormats, - xTablePartitionConfig, - null); - ConversionController conversionControllerAppended = - new ConversionController(jsc.hadoopConfiguration()); - conversionControllerAppended.sync(conversionConfigAppended, conversionSourceProvider); + writeData(dfAppend, dataPath, xTablePartitionConfig); + ConversionConfig conversionConfigAppended = + getTableSyncConfig( + sourceTableFormat, + tableFormatPartitionDataHolder.getSyncMode(), + tableName, + table, + targetTableFormats, + xTablePartitionConfig, + null); + ConversionController conversionControllerAppended = + new ConversionController(jsc.hadoopConfiguration()); + conversionControllerAppended.sync(conversionConfigAppended, conversionSourceProvider); + checkDatasetEquivalenceWithFilter( + sourceTableFormat, table, targetTableFormats, isPartitioned); + } + } + + private void writeData(Dataset df, String dataPath, String partitionConfig) { + if (partitionConfig != null) { + // extract partition columns from config + String[] partitionCols = + Arrays.stream(partitionConfig.split(":")[2].split("/")) + .map(s -> s.split("=")[0]) + .toArray(String[]::new); + // add partition columns to dataframe + for (String partitionCol : partitionCols) { + if (partitionCol.equals("year")) { + df = + df.withColumn( + "year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))); + } else if (partitionCol.equals("month")) { + df = + df.withColumn( + "month", + functions.date_format( + functions.col("timestamp").cast(DataTypes.TimestampType), "MM")); + } else if (partitionCol.equals("day")) { + df = + df.withColumn( + "day", + functions.date_format( + functions.col("timestamp").cast(DataTypes.TimestampType), "dd")); + } + } + df.write().mode(SaveMode.Append).partitionBy(partitionCols).parquet(dataPath); + } else { + df.write().mode(SaveMode.Append).parquet(dataPath); + } + } + + @Test + void testIncrementalSyncWithMultiplePartitions() throws IOException { + + Configuration conf = sparkSession.sparkContext().hadoopConfiguration(); + + StructType schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("value", DataTypes.StringType, false), + DataTypes.createStructField("year", DataTypes.IntegerType, false), + DataTypes.createStructField("month", DataTypes.IntegerType, false) + }); + List data = + Arrays.asList( + RowFactory.create(100, "A", 2026, 12), + RowFactory.create(101, "AA", 2026, 12), + RowFactory.create(102, "CB", 2027, 11), + RowFactory.create(103, "BA", 2027, 11)); + + Dataset dfInit = sparkSession.createDataFrame(data, schema); + Path fixedPath = Paths.get("target", "fixed-parquet-data", "parquet_table_test_2"); + // String outputPath = fixedPath.toString(); + Dataset df = dfInit.withColumn("full_date", expr("make_date(year, month, 1)")); + String outputPath = + new java.io.File("target/fixed-parquet-data/parquet_table_test_2").getAbsolutePath(); + df.coalesce(1).write().partitionBy("year", "month").mode("overwrite").parquet(outputPath); + + // test find files to sync + FileSystem fs = FileSystem.get(fixedPath.toUri(), conf); + // set the modification time to the table file + // update modificationTime for file to append + // many partitions case + List newPartitions = Arrays.asList("year=2026/month=12", "year=2027/month=11"); + long targetModificationTime = System.currentTimeMillis() - 360000; + long newModificationTime = System.currentTimeMillis() - 50000; + long testTime = System.currentTimeMillis() - 90000; // between two prev times + for (String partition : newPartitions) { + org.apache.hadoop.fs.Path partitionPath = + new org.apache.hadoop.fs.Path(outputPath, partition); + if (fs.exists(partitionPath)) { + updateModificationTimeRecursive(fs, partitionPath, targetModificationTime); + } + } + // create new file to append using Spark + List futureDataToSync = + Arrays.asList( + RowFactory.create(101, "A", 2026, 12), + RowFactory.create(301, "D", 2027, 11), + RowFactory.create(302, "DA", 2027, 11)); + Dataset dfToSyncInit = sparkSession.createDataFrame(futureDataToSync, schema); + Dataset dfToSync = dfToSyncInit.withColumn("full_date", expr("make_date(year, month, 1)")); + dfToSync.coalesce(1).write().partitionBy("year", "month").mode("append").parquet(outputPath); + + // conversionSource operations + Properties sourceProperties = new Properties(); + String partitionConfig = "full_date:MONTH:year=yyyy/month=MM"; + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + + SourceTable tableConfig = + SourceTable.builder() + .name("parquet_table_test_2") + .basePath(fixedPath.toAbsolutePath().toString()) + .additionalProperties(sourceProperties) + .formatName(TableFormat.PARQUET) + .build(); + + ParquetConversionSourceProvider conversionSourceProvider = + new ParquetConversionSourceProvider(); + conversionSourceProvider.init(conf); + ParquetConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + for (String partition : newPartitions) { + org.apache.hadoop.fs.Path partitionPath = + new org.apache.hadoop.fs.Path(outputPath, partition); + + RemoteIterator it = fs.listFiles(partitionPath, false); + while (it.hasNext()) { + LocatedFileStatus fileStatus = it.next(); + + if (fileStatus.getModificationTime() > newModificationTime) { + fs.setTimes(fileStatus.getPath(), newModificationTime, -1); + } else { + fs.setTimes(fileStatus.getPath(), targetModificationTime, -1); + } + } + fs.setTimes(partitionPath, newModificationTime, -1); + } + + InternalTable result = conversionSource.getTable(newModificationTime); + assertEquals( + Instant.ofEpochMilli(newModificationTime).toString(), + result.getLatestCommitTime().toString()); + assertEquals("parquet_table_test_2", result.getName()); + assertEquals(TableFormat.PARQUET, result.getTableFormat()); + assertNotNull(result.getReadSchema()); + InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); + assertNotNull(snapshot); + TableChange changes = conversionSource.getTableChangeForCommit(newModificationTime); + assertNotNull(changes); + Instant instantBeforeFirstSnapshot = + Instant.ofEpochMilli(snapshot.getTable().getLatestCommitTime().toEpochMilli()); + assertEquals(instantBeforeFirstSnapshot.toEpochMilli(), newModificationTime); + assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(testTime))); + } + + private void updateModificationTimeRecursive( + FileSystem fs, org.apache.hadoop.fs.Path path, long time) throws IOException { + RemoteIterator it = fs.listFiles(path, true); + while (it.hasNext()) { + LocatedFileStatus status = it.next(); + if (status.getPath().getName().endsWith(".parquet")) { + fs.setTimes(status.getPath(), time, -1); } } } @@ -354,8 +448,7 @@ private void checkDatasetEquivalenceWithFilter( String sourceFormat, GenericTable sourceTable, List targetFormats, - boolean isPartitioned) - throws URISyntaxException { + boolean isPartitioned) { checkDatasetEquivalence( sourceFormat, sourceTable, @@ -373,8 +466,7 @@ private void checkDatasetEquivalence( List targetFormats, Map> targetOptions, Integer expectedCount, - boolean isPartitioned) - throws URISyntaxException { + boolean isPartitioned) { Dataset sourceRows = sparkSession .read() @@ -382,7 +474,8 @@ private void checkDatasetEquivalence( .options(sourceOptions) .option("recursiveFileLookup", "true") .option("pathGlobFilter", "*.parquet") - .parquet(sourceTable.getDataPath()); + .parquet(sourceTable.getDataPath()) + .orderBy("id"); // order by id to ensure deterministic order for comparison Map> targetRowsByFormat = targetFormats.stream() .collect( @@ -401,7 +494,8 @@ private void checkDatasetEquivalence( .read() .options(finalTargetOptions) .format(targetFormat.toLowerCase()) - .load(sourceTable.getDataPath()); + .load(sourceTable.getDataPath()) + .orderBy("id"); })); String[] selectColumnsArr = schema.fieldNames(); @@ -414,7 +508,6 @@ private void checkDatasetEquivalence( String format = entry.getKey(); Dataset targetRows = entry.getValue(); - targetRows.show(); List dataset2Rows = targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList(); @@ -458,5 +551,6 @@ private static class TableFormatPartitionDataHolder { String xTablePartitionConfig; Optional hudiSourceConfig; String filter; + SyncMode syncMode; } } diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetDataManager.java b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetDataManager.java new file mode 100644 index 000000000..7a0b0ce30 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetDataManager.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestParquetDataManager { + + private static final Configuration CONF = new Configuration(); + + // Helper method to create a mock LocatedFileStatus + private static LocatedFileStatus createMockFileStatus(String name, long modTime) { + LocatedFileStatus mockStatus = mock(LocatedFileStatus.class); + org.apache.hadoop.fs.Path mockPath = mock(org.apache.hadoop.fs.Path.class); + when(mockPath.getName()).thenReturn(name); + when(mockStatus.getPath()).thenReturn(mockPath); + when(mockStatus.getModificationTime()).thenReturn(modTime); + when(mockStatus.getLen()).thenReturn(1024L); + return mockStatus; + } + + private static class StubbedRemoteIterator implements RemoteIterator { + private final Iterator files; + + public StubbedRemoteIterator(List files) { + this.files = files.iterator(); + } + + @Override + public boolean hasNext() { + return files.hasNext(); + } + + @Override + public LocatedFileStatus next() { + return files.next(); + } + } + + // Helper method to create a mock FileSystem + private static FileSystem createMockFileSystem(RemoteIterator iterator) + throws IOException { + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.listFiles(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(iterator); + return mockFs; + } + + @Test + void testGetMostRecentParquetFile_withMultipleFiles() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 3000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getMostRecentParquetFile(); + + assertNotNull(result); + assertEquals(3000L, result.getModificationTime()); + } + + @Test + void testGetMostRecentParquetFile_noFiles() throws IOException { + RemoteIterator iterator = new StubbedRemoteIterator(new ArrayList<>()); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, manager::getMostRecentParquetFile); + assertEquals("No files found", exception.getMessage()); + } + + @Test + void testGetParquetDataFileAt_exactMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getParquetDataFileAt(2000L); + + assertNotNull(result); + assertEquals(2000L, result.getModificationTime()); + } + + @Test + void testGetParquetDataFileAt_firstAfterTime() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2500L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getParquetDataFileAt(2000L); + + assertNotNull(result); + assertEquals(2500L, result.getModificationTime()); + } + + @Test + void testGetParquetDataFileAt_multipleAfterTime() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2500L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getParquetDataFileAt(2000L); + + assertNotNull(result); + assertEquals(2500L, result.getModificationTime()); + } + + @Test + void testGetParquetDataFileAt_noMatch() throws IOException { + RemoteIterator iterator = new StubbedRemoteIterator(Collections.emptyList()); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> manager.getParquetDataFileAt(5000L)); + assertTrue(exception.getMessage().contains("No file found at or after 5000")); + } + + @Test + void testGetParquetDataFileAt_allBefore() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> manager.getParquetDataFileAt(3000L)); + assertTrue(exception.getMessage().contains("No file found at or after 3000")); + } + + @Test + void testGetCurrentFileInfo_multipleFiles() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + Stream result = manager.getCurrentFileInfo(); + + assertNotNull(result); + List fileList = result.collect(Collectors.toList()); + assertEquals(3, fileList.size()); + assertEquals(1000L, fileList.get(0).getModificationTime()); + assertEquals(2000L, fileList.get(1).getModificationTime()); + assertEquals(3000L, fileList.get(2).getModificationTime()); + } + + @Test + void testGetCurrentFileInfo_emptyList() throws IOException { + RemoteIterator iterator = new StubbedRemoteIterator(Collections.emptyList()); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + Stream result = manager.getCurrentFileInfo(); + + assertNotNull(result); + assertEquals(0, result.count()); + } + + @Test + void testGetCurrentFileInfo_streamCharacteristics() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + Stream result = manager.getCurrentFileInfo(); + + assertNotNull(result); + assertTrue(result.allMatch(info -> info.getModificationTime() > 0)); + } + + @Test + void testGetParquetFilesMetadataAfterTime_someMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(2000L); + + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals(2000L, result.get(0).getModificationTime()); + assertEquals(3000L, result.get(1).getModificationTime()); + } + + @Test + void testGetParquetFilesMetadataAfterTime_allMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 2000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(1000L); + + assertNotNull(result); + assertEquals(2, result.size()); + } + + @Test + void testGetParquetFilesMetadataAfterTime_noneMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(5000L); + + assertNotNull(result); + assertEquals(0, result.size()); + } + + @Test + void testGetParquetFilesMetadataAfterTime_exactTimeMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(2000L); + + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals(2000L, result.get(0).getModificationTime()); + assertEquals(3000L, result.get(1).getModificationTime()); + } + + @Test + void testGetParquetFiles_caching() throws IOException { + LocatedFileStatus file = createMockFileStatus("file.parquet", 1000L); + RemoteIterator iterator = + new StubbedRemoteIterator(Collections.singletonList(file)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + // No filesystem access should happen yet + verify(mockFs, never()).listFiles(any(org.apache.hadoop.fs.Path.class), anyBoolean()); + + // Access multiple times + manager.getCurrentFileInfo(); + manager.getMostRecentParquetFile(); + manager.getParquetFilesMetadataAfterTime(0L); + + // Verify filesystem was accessed only once + verify(mockFs, times(1)).listFiles(any(org.apache.hadoop.fs.Path.class), anyBoolean()); + } + + @Test + void testOnlyParquetFilesIncluded() throws IOException { + LocatedFileStatus parquetFile = createMockFileStatus("data.parquet", 1000L); + LocatedFileStatus txtFile = createMockFileStatus("readme.txt", 2000L); + LocatedFileStatus jsonFile = createMockFileStatus("config.json", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(parquetFile, txtFile, jsonFile)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(0L); + + assertEquals(1, result.size()); + assertEquals(1000L, result.get(0).getModificationTime()); + } + + @Test + void testWithRealFileSystem_multipleFiles(@TempDir Path tempDir) throws IOException { + // Create multiple parquet files with different modification times + Path file1 = tempDir.resolve("data1.parquet"); + Path file2 = tempDir.resolve("data2.parquet"); + Path file3 = tempDir.resolve("data3.parquet"); + + Files.createFile(file1); + Files.createFile(file2); + Files.createFile(file3); + + // Set different modification times + Files.setLastModifiedTime(file1, FileTime.fromMillis(1000L)); + Files.setLastModifiedTime(file2, FileTime.fromMillis(3000L)); + Files.setLastModifiedTime(file3, FileTime.fromMillis(2000L)); + + ParquetDataManager manager = new ParquetDataManager(CONF, tempDir.toString()); + + ParquetFileInfo mostRecent = manager.getMostRecentParquetFile(); + assertNotNull(mostRecent); + assertEquals(3000L, mostRecent.getModificationTime()); + + List afterTime = manager.getParquetFilesMetadataAfterTime(2000L); + assertEquals(2, afterTime.size()); + } + + @Test + void testWithRealFileSystem_nestedDirectories(@TempDir Path tempDir) throws IOException { + // Create nested directory structure + Path subDir1 = tempDir.resolve("subdir1"); + Path subDir2 = tempDir.resolve("subdir2"); + Files.createDirectories(subDir1); + Files.createDirectories(subDir2); + + // Create parquet files in different directories + Path file1 = tempDir.resolve("root.parquet"); + Path file2 = subDir1.resolve("nested1.parquet"); + Path file3 = subDir2.resolve("nested2.parquet"); + + Files.createFile(file1); + Files.createFile(file2); + Files.createFile(file3); + + Files.setLastModifiedTime(file1, FileTime.fromMillis(1000L)); + Files.setLastModifiedTime(file2, FileTime.fromMillis(2000L)); + Files.setLastModifiedTime(file3, FileTime.fromMillis(3000L)); + ParquetDataManager manager = new ParquetDataManager(CONF, tempDir.toString()); + + List allFiles = manager.getParquetFilesMetadataAfterTime(0L); + assertEquals(3, allFiles.size()); + + ParquetFileInfo mostRecent = manager.getMostRecentParquetFile(); + assertEquals(3000L, mostRecent.getModificationTime()); + } + + @Test + void testWithRealFileSystem_mixedFileTypes(@TempDir Path tempDir) throws IOException { + // Create mix of parquet and non-parquet files + Path parquetFile1 = tempDir.resolve("data1.parquet"); + Path parquetFile2 = tempDir.resolve("data2.parquet"); + Path txtFile = tempDir.resolve("readme.txt"); + Path jsonFile = tempDir.resolve("config.json"); + + Files.createFile(parquetFile1); + Files.createFile(parquetFile2); + Files.createFile(txtFile); + Files.createFile(jsonFile); + + Files.setLastModifiedTime(parquetFile1, FileTime.fromMillis(1000L)); + Files.setLastModifiedTime(parquetFile2, FileTime.fromMillis(2000L)); + Files.setLastModifiedTime(txtFile, FileTime.fromMillis(3000L)); + Files.setLastModifiedTime(jsonFile, FileTime.fromMillis(4000L)); + + ParquetDataManager manager = new ParquetDataManager(CONF, tempDir.toString()); + + List allFiles = manager.getParquetFilesMetadataAfterTime(0L); + // Only parquet files should be included + assertEquals(2, allFiles.size()); + + ParquetFileInfo mostRecent = manager.getMostRecentParquetFile(); + // Most recent parquet file, not the txt or json + assertEquals(2000L, mostRecent.getModificationTime()); + } +}