Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
e541a71
given a parquet file return data from a certain modification time
Dec 10, 2025
15e282a
create the path based on the partition then inject the file to append…
Dec 13, 2025
2ee71c9
Handle case of path construction with file partitioned over many fiel…
Dec 14, 2025
6032e5f
test append Parquet file into table init
Dec 14, 2025
f6fdc72
add function to test schema equivalence before appending
Dec 15, 2025
a94c3f3
construct path to inject to based on partitions
Dec 16, 2025
f8bdbfe
fix imports
Dec 16, 2025
c04a983
refactoring (lombok, logs, javadocs and function and approach comment…
Dec 17, 2025
5f2541e
use appendFile to append a file into a table while tracking the appen…
Jan 1, 2026
47e7076
find the files that satisfy to the time condition
Jan 1, 2026
fbb09ec
treat appends as separate files to add in the target partition folder
Jan 1, 2026
fe19a60
update approach: selective block compaction
Jan 2, 2026
da7f300
update approach: added a basic test to check data selection using mod…
Jan 2, 2026
a8730b7
fix append based on partition value
Jan 2, 2026
d19ccbf
fix test with basic example where partitions are not considered
Jan 2, 2026
aecb204
fix test with basic example where partitions are not considered2
Jan 2, 2026
0ec8cbb
fix test with basic example where partitions are not considered3
Jan 2, 2026
9cb75df
test with time of last append is now
Jan 3, 2026
9e125f2
test appendFile with Parquet: TODO test with multiple partitions 1) a…
Jan 3, 2026
233ca77
merge recursively one partition files
Jan 3, 2026
b4cba5a
fix paths for files to append
Jan 3, 2026
a564b29
fix bug of appending file path
Jan 3, 2026
d1ceafb
fix bug of schema
Jan 3, 2026
649250f
make sure returned files are non empty when iterating in the partitions
Jan 3, 2026
24ff828
make sure returned files are non empty when iterating in the partitio…
Jan 3, 2026
463d2ee
discard empty files when appending
Jan 3, 2026
71d1c34
discard empty files when appending
Jan 3, 2026
3319a91
one spark session for both tests
Jan 3, 2026
013ffe4
bug fix
Jan 3, 2026
b939a0a
bug fix
Jan 3, 2026
b6d8ddc
cleanups + TODO: partitions match and merge
Jan 3, 2026
c18ab1c
added test for many partitions, TODO integrate functions into Parquet…
Jan 4, 2026
b7c613e
selecting data bug fix, TODO integrate functions into ParquetConversi…
Jan 4, 2026
2a75f49
run all tests, TODO integrate functions into ParquetConversionSource
Jan 4, 2026
f1538b0
run all tests, TODO integrate functions into ParquetConversionSource
Jan 4, 2026
cdedeae
spotless:apply, TODO integrate functions into ParquetConversionSource
Jan 4, 2026
e873120
bug fixes, TODO integrate functions into ParquetConversionSource
Jan 4, 2026
4d0f245
bug fixes, TODO integrate functions into ParquetConversionSource
Jan 5, 2026
219656e
handle lightweight metadata in the conversionSource
Jan 12, 2026
ff809d7
fix CI
Jan 12, 2026
e06368f
fix CI, refactoring and isIncrementalSyncSafeFrom implementation
Jan 12, 2026
7cdccd0
added test
Jan 12, 2026
b919146
added test: init conversionSource instance
Jan 12, 2026
8ff5aa6
added test: init conversionSource instance: fix error
Jan 12, 2026
db9d7ab
added test: fix error
Jan 13, 2026
324d703
more tests + bug fixes and reformatting
Jan 13, 2026
3c453e6
CI fix
Jan 13, 2026
e4c0b4c
CI fix
Jan 13, 2026
4315282
CI fix
Jan 13, 2026
1417929
CI fix
Jan 13, 2026
7620c01
CI fix
Jan 13, 2026
e458d72
Merge branch 'main' of https://github.com/sapienza88/incubator-xtable…
Jan 14, 2026
9947f1b
Merge branch 'main' of https://github.com/sapienza88/incubator-xtable…
Jan 16, 2026
cd66151
CI fix
Jan 16, 2026
4e4c5cb
merge ready fixes & tests + refactoring & reformatting
Jan 18, 2026
212e50a
optimization: read metadata once for target file, read parquet files …
Jan 19, 2026
5ea0b5a
Lazily fetch info, move state into ParquetDataManager
the-other-tim-brown Feb 2, 2026
64a5a2d
add TestParquetDataManager, simplify data manager methods
the-other-tim-brown Feb 12, 2026
2135ccf
make IT parameterized, add incremental sync
the-other-tim-brown Feb 12, 2026
e76ac21
fix schema extraction to fix snapshot sync
the-other-tim-brown Feb 12, 2026
9901c1b
add optimizations
the-other-tim-brown Feb 12, 2026
a2bb6c3
fix for ParquetDataManager Test (mock())
Feb 13, 2026
2d23f31
before resycing delete the first synced files metadata
Feb 13, 2026
ceb924c
in order to append parquet files, perform union then spark overwrite …
Feb 14, 2026
5b0a9fd
solving directory issue when overwriting data using spark
Feb 14, 2026
80e1927
solving directory issue when overwriting data using spark2
Feb 14, 2026
0aedd70
solving directory issue when overwriting data using spark3
Feb 14, 2026
5cbeeeb
solving directory issue when overwriting data using spark4
Feb 14, 2026
13e5a65
solving directory issue when overwriting data using spark5
Feb 14, 2026
fa77646
solving directory issue when overwriting data using spark6
Feb 14, 2026
e53fa4b
solving directory issue when overwriting data using spark7
Feb 14, 2026
a6b75d5
revert writeData changes
Feb 14, 2026
bdc9f40
solving directory issue when overwriting data using spark8
Feb 15, 2026
43d986d
solving directory issue when overwriting data using spark9
Feb 15, 2026
5f67a0f
solving directory issue when overwriting data using spark9
Feb 15, 2026
dae07b7
revert changes
Feb 15, 2026
9e0de0a
testing without cleaning metadata for the second sync
Feb 15, 2026
8c1b8f6
testing without cleaning metadata for the second sync
Feb 15, 2026
84bae49
testing without cleaning metadata for the second sync
Feb 15, 2026
6ce28a3
testing without cleaning metadata for the second sync
Feb 15, 2026
cf2564c
testing without cleaning metadata for the second sync
Feb 15, 2026
6cb87ee
revert changes
Feb 15, 2026
5c25093
revert changes
Feb 15, 2026
17a3134
revert changes
Feb 15, 2026
5b0d8dc
use no tempDir
Feb 15, 2026
8db84a0
revert changes
Feb 15, 2026
b8e0d16
use union() to append data for the second sync()
Feb 21, 2026
fdaeb4d
use union() to append data for the second sync()
Feb 21, 2026
b2f214d
use union() to append data for the second sync()
Feb 21, 2026
913ac29
use union() to append data for the second sync()
Feb 21, 2026
9e243a1
use union() to append data for the second sync() reverted
Feb 21, 2026
a7412c3
fix second sync error
Feb 22, 2026
c028112
fix second sync error
Feb 22, 2026
089e869
fix second sync error
Feb 22, 2026
f1f8be1
fix second sync error
Feb 22, 2026
6588d1d
fix second sync error
Feb 22, 2026
da9e401
fix second sync error
Feb 22, 2026
15567ab
fix second sync error
Feb 22, 2026
119b68c
fix second sync error
Feb 22, 2026
ca49c63
fix second sync error
Feb 22, 2026
9711135
fix second sync error
Feb 22, 2026
a62a24c
fix second sync error
Feb 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,41 @@

package org.apache.xtable.parquet;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.Builder;
import lombok.NonNull;
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

import org.apache.xtable.exception.ReadException;
import org.apache.xtable.hudi.*;
import org.apache.xtable.hudi.HudiPathUtils;
import org.apache.xtable.model.*;
import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.*;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.extractor.ConversionSource;

@Builder
@Log4j2
public class ParquetConversionSource implements ConversionSource<Long> {

private static final ParquetSchemaExtractor schemaExtractor =
Expand All @@ -68,10 +69,25 @@ public class ParquetConversionSource implements ConversionSource<Long> {
private final String tableName;
private final String basePath;
@NonNull private final Configuration hadoopConf;
private final ParquetDataManager parquetDataManager;

@Builder
ParquetConversionSource(
String tableName,
String basePath,
@NonNull Configuration hadoopConf,
ParquetPartitionValueExtractor partitionValueExtractor,
PathBasedPartitionSpecExtractor partitionSpecExtractor) {
this.tableName = tableName;
this.basePath = basePath;
this.hadoopConf = hadoopConf;
this.partitionValueExtractor = partitionValueExtractor;
this.partitionSpecExtractor = partitionSpecExtractor;
this.parquetDataManager = new ParquetDataManager(hadoopConf, basePath);
}

private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) {
ParquetMetadata parquetMetadata =
parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath());
private InternalTable createInternalTableFromFile(ParquetFileInfo latestFile) {
ParquetMetadata parquetMetadata = latestFile.getMetadata();
MessageType parquetSchema = parquetMetadataExtractor.getSchema(parquetMetadata);
InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, "");
List<InternalPartitionField> partitionFields = partitionSpecExtractor.spec(schema);
Expand All @@ -94,49 +110,26 @@ private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile)
@Override
public InternalTable getTable(Long modificationTime) {
// get parquetFile at specific time modificationTime
Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath);
LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
ParquetFileInfo file = parquetDataManager.getParquetDataFileAt(modificationTime);
return createInternalTableFromFile(file);
}

private Stream<InternalDataFile> getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
return parquetFiles.map(
file ->
InternalDataFile.builder()
.physicalPath(file.getPath().toString())
.fileFormat(FileFormat.APACHE_PARQUET)
.fileSizeBytes(file.getLen())
.partitionValues(
partitionValueExtractor.extractPartitionValues(
partitionSpecExtractor.spec(
partitionValueExtractor.extractSchemaForParquetPartitions(
parquetMetadataExtractor.readParquetMetadata(
hadoopConf, file.getPath()),
file.getPath().toString())),
HudiPathUtils.getPartitionPath(new Path(basePath), file.getPath())))
.lastModified(file.getModificationTime())
.columnStats(
parquetStatsExtractor.getColumnStatsForaFile(
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath())))
.build());
private Stream<InternalDataFile> getInternalDataFiles(
Stream<ParquetFileInfo> parquetFiles, InternalSchema schema) {
return parquetFiles.map(file -> createInternalDataFileFromParquetFile(file, schema));
}

private InternalDataFile createInternalDataFileFromParquetFile(FileStatus parquetFile) {
private InternalDataFile createInternalDataFileFromParquetFile(
ParquetFileInfo parquetFile, InternalSchema schema) {
return InternalDataFile.builder()
.physicalPath(parquetFile.getPath().toString())
.partitionValues(
partitionValueExtractor.extractPartitionValues(
partitionSpecExtractor.spec(
partitionValueExtractor.extractSchemaForParquetPartitions(
parquetMetadataExtractor.readParquetMetadata(
hadoopConf, parquetFile.getPath()),
parquetFile.getPath().toString())),
basePath))
partitionSpecExtractor.spec(schema),
HudiPathUtils.getPartitionPath(new Path(basePath), parquetFile.getPath())))
.lastModified(parquetFile.getModificationTime())
.fileSizeBytes(parquetFile.getLen())
.columnStats(
parquetStatsExtractor.getColumnStatsForaFile(
parquetMetadataExtractor.readParquetMetadata(hadoopConf, parquetFile.getPath())))
.fileSizeBytes(parquetFile.getSize())
.columnStats(parquetStatsExtractor.getColumnStatsForaFile(parquetFile.getMetadata()))
.build();
}

Expand All @@ -149,87 +142,84 @@ public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync syncIns

@Override
public TableChange getTableChangeForCommit(Long modificationTime) {
Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath);
Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();

List<FileStatus> tableChangesAfter =
parquetFiles
.filter(fileStatus -> fileStatus.getModificationTime() > modificationTime)
.collect(Collectors.toList());
InternalTable internalTable = getMostRecentTable(parquetFiles);
for (FileStatus tableStatus : tableChangesAfter) {
InternalDataFile currentDataFile = createInternalDataFileFromParquetFile(tableStatus);
addedInternalDataFiles.add(currentDataFile);
}
List<ParquetFileInfo> tableChangesAfterModificationTime =
parquetDataManager.getParquetFilesMetadataAfterTime(modificationTime);
InternalTable internalTable =
getMostRecentTableConfig(tableChangesAfterModificationTime.stream());
Set<InternalDataFile> addedInternalDataFiles =
tableChangesAfterModificationTime.stream()
.map(file -> createInternalDataFileFromParquetFile(file, internalTable.getReadSchema()))
.collect(Collectors.toSet());

return TableChange.builder()
.sourceIdentifier(
getCommitIdentifier(
parquetDataManager.getMostRecentParquetFile().getModificationTime()))
.tableAsOfChange(internalTable)
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
.build();
}

private InternalTable getMostRecentTable(Stream<LocatedFileStatus> parquetFiles) {
LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
private InternalTable getMostRecentTable() {
ParquetFileInfo latestFile = parquetDataManager.getMostRecentParquetFile();
return createInternalTableFromFile(latestFile);
}

private InternalTable getMostRecentTableConfig(Stream<ParquetFileInfo> parquetFiles) {
ParquetFileInfo latestFile =
parquetFiles
.max(Comparator.comparing(ParquetFileInfo::getModificationTime))
.orElseThrow(() -> new IllegalStateException("No files found"));
return createInternalTableFromFile(latestFile);
}

@Override
public InternalTable getCurrentTable() {
Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath);
return getMostRecentTable(parquetFiles);
return getMostRecentTable();
}

/**
* get current snapshot
*
* @return
*/
@Override
public InternalSnapshot getCurrentSnapshot() {
// to avoid consume the stream call the method twice to return the same stream of parquet files
InternalTable table = getMostRecentTable();
Stream<InternalDataFile> internalDataFiles =
getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, basePath));
getInternalDataFiles(parquetDataManager.getCurrentFileInfo(), table.getReadSchema());
return InternalSnapshot.builder()
.table(table)
.sourceIdentifier(
getCommitIdentifier(
getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath))
.getModificationTime()))
parquetDataManager.getMostRecentParquetFile().getModificationTime()))
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
.build();
}

private LocatedFileStatus getMostRecentParquetFile(Stream<LocatedFileStatus> parquetFiles) {
return parquetFiles
.max(Comparator.comparing(FileStatus::getModificationTime))
.orElseThrow(() -> new IllegalStateException("No files found"));
}
@Override
public boolean isIncrementalSyncSafeFrom(Instant timeInMillis) {
Stream<ParquetFileInfo> parquetFilesMetadata = parquetDataManager.getCurrentFileInfo();
OptionalLong earliestModTimeOpt =
parquetFilesMetadata.mapToLong(ParquetFileInfo::getModificationTime).min();

if (!earliestModTimeOpt.isPresent()) {
log.warn("No parquet files found in table {}. Incremental sync is not possible.", tableName);
return false;
}

private LocatedFileStatus getParquetFileAt(
Stream<LocatedFileStatus> parquetFiles, long modificationTime) {
return parquetFiles
.filter(fileStatus -> fileStatus.getModificationTime() == modificationTime)
.findFirst()
.orElseThrow(() -> new IllegalStateException("No file found at " + modificationTime));
}
long earliestModTime = earliestModTimeOpt.getAsLong();

private Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, String basePath) {
try {
FileSystem fs = FileSystem.get(hadoopConf);
URI uriBasePath = new URI(basePath);
String parentPath = Paths.get(uriBasePath).toString();
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(parentPath), true);
return RemoteIterators.toList(iterator).stream()
.filter(file -> file.getPath().getName().endsWith("parquet"));
} catch (IOException | URISyntaxException e) {
throw new ReadException("Unable to read files from file system", e);
if (earliestModTime > timeInMillis.toEpochMilli()) {
log.warn(
"Incremental sync is not safe. Earliest available metadata (time={}) is newer "
+ "than requested instant {}.",
Instant.ofEpochMilli(earliestModTime),
timeInMillis.toEpochMilli());
return false;
}
}

@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
return false;
log.debug(
"Incremental sync is safe from instant {} for table {}",
timeInMillis.toEpochMilli(),
tableName);
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.parquet;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.functional.RemoteIterators;

import org.apache.xtable.exception.ReadException;

/**
* Manages Parquet File's Metadata
*
* <p>This class provides functions to handle Parquet metadata, creating metadata objects from
* parquet files and filtering the files based on the modification times.
*/
@Log4j2
@RequiredArgsConstructor
public class ParquetDataManager {
private final Configuration hadoopConf;
private final String basePath;
private final FileSystem fileSystem;

public ParquetDataManager(Configuration hadoopConf, String basePath) {
this.hadoopConf = hadoopConf;
this.basePath = basePath;
try {
URI uri = Paths.get(basePath).toUri();
this.fileSystem = FileSystem.get(uri, hadoopConf);
} catch (IOException e) {
throw new ReadException("Unable to initialize file system for base path: " + basePath, e);
}
}

@Getter(value = AccessLevel.PRIVATE, lazy = true)
private final List<LocatedFileStatus> parquetFiles = loadParquetFiles();

ParquetFileInfo getMostRecentParquetFile() {
LocatedFileStatus file =
getParquetFiles().stream()
.max(Comparator.comparing(LocatedFileStatus::getModificationTime))
.orElseThrow(() -> new IllegalStateException("No files found"));
return new ParquetFileInfo(hadoopConf, file);
}

ParquetFileInfo getParquetDataFileAt(long targetTime) {
return getParquetFiles().stream()
.filter(file -> file.getModificationTime() >= targetTime)
.min(Comparator.comparing(LocatedFileStatus::getModificationTime))
.map(file -> new ParquetFileInfo(hadoopConf, file))
.orElseThrow(() -> new IllegalStateException("No file found at or after " + targetTime));
}

private List<LocatedFileStatus> loadParquetFiles() {
try {
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path(basePath), true);
return RemoteIterators.toList(iterator).stream()
.filter(file -> file.getPath().getName().endsWith("parquet"))
.collect(Collectors.toList());
} catch (IOException e) {
throw new ReadException("Unable to read files from file system", e);
}
}

Stream<ParquetFileInfo> getCurrentFileInfo() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown You would want to rename this to the plural (getCurrentFilesInfo())?

return getParquetFiles().stream()
.map(fileStatus -> new ParquetFileInfo(hadoopConf, fileStatus));
}

List<ParquetFileInfo> getParquetFilesMetadataAfterTime(long syncTime) {
return getParquetFiles().stream()
.filter(file -> file.getModificationTime() >= syncTime)
.map(file -> new ParquetFileInfo(hadoopConf, file))
.collect(Collectors.toList());
}
}
Loading
Loading