From 3ace2966384f5b43b5ea578ff6b0703646161b92 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Fri, 30 Jan 2026 15:57:43 +0100 Subject: [PATCH 1/2] feat: cli to fail on sync error --- .../xtable/utilities/RunCatalogSync.java | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index e04d69850..0cd6ce4bc 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -67,6 +67,8 @@ import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncStatusCode; import org.apache.xtable.reflection.ReflectionUtils; import org.apache.xtable.spi.extractor.CatalogConversionSource; import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier; @@ -85,6 +87,7 @@ public class RunCatalogSync { private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = "catalogConfig"; private static final String HADOOP_CONFIG_PATH = "hadoopConfig"; private static final String CONVERTERS_CONFIG_PATH = "convertersConfig"; + private static final String FAIL_ON_ERROR_OPTION = "failOnError"; private static final String HELP_OPTION = "h"; private static final Map CONVERSION_SOURCE_PROVIDERS = new HashMap<>(); @@ -108,6 +111,11 @@ public class RunCatalogSync { true, "The path to a yaml file containing InternalTable converter configurations. " + "These configs will override the default") + .addOption( + FAIL_ON_ERROR_OPTION, + "failOnError", + false, + "Fail the process if any table or catalog sync fails") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); public static void main(String[] args) throws Exception { @@ -126,6 +134,8 @@ public static void main(String[] args) throws Exception { return; } + boolean failOnError = cmd.hasOption(FAIL_ON_ERROR_OPTION); + DatasetConfig datasetConfig; try (InputStream inputStream = Files.newInputStream( @@ -190,15 +200,38 @@ public static void main(String[] args) throws Exception { .distinct() .collect(Collectors.toList()); try { - conversionController.syncTableAcrossCatalogs( - conversionConfig, - getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf)); + Map syncResults = + conversionController.syncTableAcrossCatalogs( + conversionConfig, + getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf)); + if (failOnError && hasSyncFailures(syncResults)) { + throw new RuntimeException("Sync completed with failures. See logs for details."); + } } catch (Exception e) { log.error("Error running sync for {}", sourceTable.getBasePath(), e); + if (failOnError) { + throw e; + } } } } + private static boolean hasSyncFailures(Map syncResults) { + return syncResults.values().stream().anyMatch(RunCatalogSync::syncResultHasFailure); + } + + private static boolean syncResultHasFailure(SyncResult syncResult) { + if (syncResult == null) { + return false; + } + SyncResult.SyncStatus tableStatus = syncResult.getTableFormatSyncStatus(); + if (tableStatus != null && tableStatus.getStatusCode() == SyncStatusCode.ERROR) { + return true; + } + return syncResult.getCatalogSyncStatusList().stream() + .anyMatch(status -> status.getStatusCode() == SyncStatusCode.ERROR); + } + static Optional getCatalogConversionSource( ExternalCatalogConfig sourceCatalog, Configuration hadoopConf) { if (CatalogType.STORAGE.equals(sourceCatalog.getCatalogType())) { From d05abcf5d8f89b20ffae6c363ffdb0f84b80f4b5 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Wed, 11 Feb 2026 16:16:19 +0100 Subject: [PATCH 2/2] add similar functionality to RunSync --- .../org/apache/xtable/utilities/RunSync.java | 56 ++++++++++++++++--- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index facbcf3a6..ddc3acb95 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -62,6 +62,8 @@ import org.apache.xtable.iceberg.IcebergCatalogConfig; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncStatusCode; import org.apache.xtable.reflection.ReflectionUtils; /** @@ -78,6 +80,7 @@ public class RunSync { private static final String ICEBERG_CATALOG_CONFIG_PATH = "i"; private static final String CONTINUOUS_MODE = "m"; private static final String CONTINUOUS_MODE_INTERVAL = "t"; + private static final String FAIL_ON_ERROR_OPTION = "failOnError"; private static final String HELP_OPTION = "h"; private static final Options OPTIONS = @@ -115,6 +118,11 @@ public class RunSync { "continuousModeInterval", true, "The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.") + .addOption( + FAIL_ON_ERROR_OPTION, + "failOnError", + false, + "Fail the process if any table sync fails") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); static SourceTable sourceTableBuilder( @@ -160,7 +168,8 @@ static void syncTableMetdata( List tableFormatList, CatalogConfig catalogConfig, Configuration hadoopConf, - ConversionSourceProvider conversionSourceProvider) { + ConversionSourceProvider conversionSourceProvider, + boolean failOnError) { ConversionController conversionController = new ConversionController(hadoopConf); for (DatasetConfig.Table table : datasetConfig.getDatasets()) { log.info( @@ -183,9 +192,16 @@ static void syncTableMetdata( .syncMode(SyncMode.INCREMENTAL) .build(); try { - conversionController.sync(conversionConfig, conversionSourceProvider); + Map syncResults = + conversionController.sync(conversionConfig, conversionSourceProvider); + if (failOnError && hasSyncFailures(syncResults)) { + throw new RuntimeException("Sync completed with failures. See logs for details."); + } } catch (Exception e) { log.error("Error running sync for {}", table.getTableBasePath(), e); + if (failOnError) { + throw e; + } } } } @@ -254,15 +270,20 @@ public static void main(String[] args) throws IOException { return; } + boolean failOnError = cmd.hasOption(FAIL_ON_ERROR_OPTION); + if (cmd.hasOption(CONTINUOUS_MODE)) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); long intervalInSeconds = Long.parseLong(cmd.getOptionValue(CONTINUOUS_MODE_INTERVAL, "5")); executorService.scheduleAtFixedRate( () -> { try { - runSync(cmd); - } catch (IOException ex) { + runSync(cmd, failOnError); + } catch (IOException | RuntimeException ex) { log.error("Sync operation failed", ex); + if (failOnError) { + throw new RuntimeException(ex); + } } }, 0, @@ -279,11 +300,11 @@ public static void main(String[] args) throws IOException { } executorService.shutdownNow(); } else { - runSync(cmd); + runSync(cmd, failOnError); } } - private static void runSync(CommandLine cmd) throws IOException { + private static void runSync(CommandLine cmd, boolean failOnError) throws IOException { String datasetConfigpath = getValueFromConfig(cmd, DATASET_CONFIG_OPTION); String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH); String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH); @@ -295,7 +316,28 @@ private static void runSync(CommandLine cmd) throws IOException { getConversionSourceProvider(conversionProviderConfigpath, datasetConfig, hadoopConf); List tableFormatList = datasetConfig.getTargetFormats(); syncTableMetdata( - datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider); + datasetConfig, + tableFormatList, + catalogConfig, + hadoopConf, + conversionSourceProvider, + failOnError); + } + + private static boolean hasSyncFailures(Map syncResults) { + return syncResults.values().stream().anyMatch(RunSync::syncResultHasFailure); + } + + private static boolean syncResultHasFailure(SyncResult syncResult) { + if (syncResult == null) { + return false; + } + SyncResult.SyncStatus tableStatus = syncResult.getTableFormatSyncStatus(); + if (tableStatus != null && tableStatus.getStatusCode() == SyncStatusCode.ERROR) { + return true; + } + return syncResult.getCatalogSyncStatusList().stream() + .anyMatch(status -> status.getStatusCode() == SyncStatusCode.ERROR); } static byte[] getCustomConfigurations(String Configpath) throws IOException {