Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.model.sync.SyncStatusCode;
import org.apache.xtable.reflection.ReflectionUtils;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier;
Expand All @@ -85,6 +87,7 @@ public class RunCatalogSync {
private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = "catalogConfig";
private static final String HADOOP_CONFIG_PATH = "hadoopConfig";
private static final String CONVERTERS_CONFIG_PATH = "convertersConfig";
private static final String FAIL_ON_ERROR_OPTION = "failOnError";
private static final String HELP_OPTION = "h";
private static final Map<String, ConversionSourceProvider> CONVERSION_SOURCE_PROVIDERS =
new HashMap<>();
Expand All @@ -108,6 +111,11 @@ public class RunCatalogSync {
true,
"The path to a yaml file containing InternalTable converter configurations. "
+ "These configs will override the default")
.addOption(
FAIL_ON_ERROR_OPTION,
"failOnError",
false,
"Fail the process if any table or catalog sync fails")
.addOption(HELP_OPTION, "help", false, "Displays help information to run this utility");

public static void main(String[] args) throws Exception {
Expand All @@ -126,6 +134,8 @@ public static void main(String[] args) throws Exception {
return;
}

boolean failOnError = cmd.hasOption(FAIL_ON_ERROR_OPTION);

DatasetConfig datasetConfig;
try (InputStream inputStream =
Files.newInputStream(
Expand Down Expand Up @@ -190,15 +200,38 @@ public static void main(String[] args) throws Exception {
.distinct()
.collect(Collectors.toList());
try {
conversionController.syncTableAcrossCatalogs(
conversionConfig,
getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf));
Map<String, SyncResult> syncResults =
conversionController.syncTableAcrossCatalogs(
conversionConfig,
getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf));
if (failOnError && hasSyncFailures(syncResults)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add similar functionality to RunSync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown good point. done

throw new RuntimeException("Sync completed with failures. See logs for details.");
}
} catch (Exception e) {
log.error("Error running sync for {}", sourceTable.getBasePath(), e);
if (failOnError) {
throw e;
}
}
}
}

private static boolean hasSyncFailures(Map<String, SyncResult> syncResults) {
return syncResults.values().stream().anyMatch(RunCatalogSync::syncResultHasFailure);
}

private static boolean syncResultHasFailure(SyncResult syncResult) {
if (syncResult == null) {
return false;
}
SyncResult.SyncStatus tableStatus = syncResult.getTableFormatSyncStatus();
if (tableStatus != null && tableStatus.getStatusCode() == SyncStatusCode.ERROR) {
return true;
}
return syncResult.getCatalogSyncStatusList().stream()
.anyMatch(status -> status.getStatusCode() == SyncStatusCode.ERROR);
}

static Optional<CatalogConversionSource> getCatalogConversionSource(
ExternalCatalogConfig sourceCatalog, Configuration hadoopConf) {
if (CatalogType.STORAGE.equals(sourceCatalog.getCatalogType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.xtable.iceberg.IcebergCatalogConfig;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.model.sync.SyncStatusCode;
import org.apache.xtable.reflection.ReflectionUtils;

/**
Expand All @@ -78,6 +80,7 @@ public class RunSync {
private static final String ICEBERG_CATALOG_CONFIG_PATH = "i";
private static final String CONTINUOUS_MODE = "m";
private static final String CONTINUOUS_MODE_INTERVAL = "t";
private static final String FAIL_ON_ERROR_OPTION = "failOnError";
private static final String HELP_OPTION = "h";

private static final Options OPTIONS =
Expand Down Expand Up @@ -115,6 +118,11 @@ public class RunSync {
"continuousModeInterval",
true,
"The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.")
.addOption(
FAIL_ON_ERROR_OPTION,
"failOnError",
false,
"Fail the process if any table sync fails")
.addOption(HELP_OPTION, "help", false, "Displays help information to run this utility");

static SourceTable sourceTableBuilder(
Expand Down Expand Up @@ -160,7 +168,8 @@ static void syncTableMetdata(
List<String> tableFormatList,
CatalogConfig catalogConfig,
Configuration hadoopConf,
ConversionSourceProvider conversionSourceProvider) {
ConversionSourceProvider conversionSourceProvider,
boolean failOnError) {
ConversionController conversionController = new ConversionController(hadoopConf);
for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
log.info(
Expand All @@ -183,9 +192,16 @@ static void syncTableMetdata(
.syncMode(SyncMode.INCREMENTAL)
.build();
try {
conversionController.sync(conversionConfig, conversionSourceProvider);
Map<String, SyncResult> syncResults =
conversionController.sync(conversionConfig, conversionSourceProvider);
if (failOnError && hasSyncFailures(syncResults)) {
throw new RuntimeException("Sync completed with failures. See logs for details.");
}
} catch (Exception e) {
log.error("Error running sync for {}", table.getTableBasePath(), e);
if (failOnError) {
throw e;
}
}
}
}
Expand Down Expand Up @@ -254,15 +270,20 @@ public static void main(String[] args) throws IOException {
return;
}

boolean failOnError = cmd.hasOption(FAIL_ON_ERROR_OPTION);

if (cmd.hasOption(CONTINUOUS_MODE)) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
long intervalInSeconds = Long.parseLong(cmd.getOptionValue(CONTINUOUS_MODE_INTERVAL, "5"));
executorService.scheduleAtFixedRate(
() -> {
try {
runSync(cmd);
} catch (IOException ex) {
runSync(cmd, failOnError);
} catch (IOException | RuntimeException ex) {
log.error("Sync operation failed", ex);
if (failOnError) {
throw new RuntimeException(ex);
}
}
},
0,
Expand All @@ -279,11 +300,11 @@ public static void main(String[] args) throws IOException {
}
executorService.shutdownNow();
} else {
runSync(cmd);
runSync(cmd, failOnError);
}
}

private static void runSync(CommandLine cmd) throws IOException {
private static void runSync(CommandLine cmd, boolean failOnError) throws IOException {
String datasetConfigpath = getValueFromConfig(cmd, DATASET_CONFIG_OPTION);
String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH);
String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
Expand All @@ -295,7 +316,28 @@ private static void runSync(CommandLine cmd) throws IOException {
getConversionSourceProvider(conversionProviderConfigpath, datasetConfig, hadoopConf);
List<String> tableFormatList = datasetConfig.getTargetFormats();
syncTableMetdata(
datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider);
datasetConfig,
tableFormatList,
catalogConfig,
hadoopConf,
conversionSourceProvider,
failOnError);
}

private static boolean hasSyncFailures(Map<String, SyncResult> syncResults) {
return syncResults.values().stream().anyMatch(RunSync::syncResultHasFailure);
}

private static boolean syncResultHasFailure(SyncResult syncResult) {
if (syncResult == null) {
return false;
}
SyncResult.SyncStatus tableStatus = syncResult.getTableFormatSyncStatus();
if (tableStatus != null && tableStatus.getStatusCode() == SyncStatusCode.ERROR) {
return true;
}
return syncResult.getCatalogSyncStatusList().stream()
.anyMatch(status -> status.getStatusCode() == SyncStatusCode.ERROR);
}

static byte[] getCustomConfigurations(String Configpath) throws IOException {
Expand Down