Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ private void setExternalTableAutoAnalyzePolicy(ExternalTable table, List<AlterOp

private void processAlterTableForExternalTable(
ExternalTable table, List<AlterOp> alterOps) throws UserException {
long updateTime = System.currentTimeMillis();
for (AlterOp alterOp : alterOps) {
if (alterOp instanceof ModifyTablePropertiesOp) {
setExternalTableAutoAnalyzePolicy(table, alterOps);
Expand Down Expand Up @@ -430,23 +431,23 @@ private void processAlterTableForExternalTable(
AddPartitionFieldOp addPartitionField = (AddPartitionFieldOp) alterOp;
if (table instanceof IcebergExternalTable) {
((IcebergExternalCatalog) table.getCatalog()).addPartitionField(
(IcebergExternalTable) table, addPartitionField);
(IcebergExternalTable) table, addPartitionField, updateTime);
} else {
throw new UserException("ADD PARTITION KEY is only supported for Iceberg tables");
}
} else if (alterOp instanceof DropPartitionFieldOp) {
DropPartitionFieldOp dropPartitionField = (DropPartitionFieldOp) alterOp;
if (table instanceof IcebergExternalTable) {
((IcebergExternalCatalog) table.getCatalog()).dropPartitionField(
(IcebergExternalTable) table, dropPartitionField);
(IcebergExternalTable) table, dropPartitionField, updateTime);
} else {
throw new UserException("DROP PARTITION KEY is only supported for Iceberg tables");
}
} else if (alterOp instanceof ReplacePartitionFieldOp) {
ReplacePartitionFieldOp replacePartitionField = (ReplacePartitionFieldOp) alterOp;
if (table instanceof IcebergExternalTable) {
((IcebergExternalCatalog) table.getCatalog()).replacePartitionField(
(IcebergExternalTable) table, replacePartitionField);
(IcebergExternalTable) table, replacePartitionField, updateTime);
} else {
throw new UserException("REPLACE PARTITION KEY is only supported for Iceberg tables");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public void handleRefreshTable(String catalogName, String dbName, String tableNa
}
return;
}
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, 0);

long updateTime = System.currentTimeMillis();
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, updateTime);
ExternalObjectLog log = ExternalObjectLog.createForRefreshTable(catalog.getId(), db.getFullName(),
table.getName());
table.getName(), updateTime);
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}

Expand Down Expand Up @@ -194,6 +194,9 @@ public void replayRefreshTable(ExternalObjectLog log) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) catalog);
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
if (table.get() instanceof HMSExternalTable && log.getLastUpdateTime() > 0) {
((HMSExternalTable) table.get()).setUpdateTime(log.getLastUpdateTime());
}
LOG.info("replay refresh partitions for table {}, "
+ "modified partitions count: {}, "
+ "new partitions count: {}",
Expand Down Expand Up @@ -229,12 +232,12 @@ public void refreshExternalTableFromEvent(String catalogName, String dbName, Str

public void refreshTableInternal(ExternalDatabase db, ExternalTable table, long updateTime) {
table.unsetObjectCreated();
if (table instanceof HMSExternalTable && updateTime > 0) {
((HMSExternalTable) table).setEventUpdateTime(updateTime);
if (updateTime > 0) {
table.setUpdateTime(updateTime);
}
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(table);
LOG.info("refresh table {}, id {} from db {} in catalog {}",
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName());
LOG.info("refresh table {}, id {} from db {} in catalog {}, update time: {}",
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName(), updateTime);
}

// Refresh partition
Expand Down Expand Up @@ -268,7 +271,7 @@ public void refreshPartitions(String catalogName, String dbName, String tableNam
}

Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache((ExternalTable) table, partitionNames);
((HMSExternalTable) table).setEventUpdateTime(updateTime);
((HMSExternalTable) table).setUpdateTime(updateTime);
}

public void addToRefreshMap(long catalogId, Integer[] sec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ public void addExternalPartitions(String catalogName, String dbName, String tabl

HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
hmsTable.setEventUpdateTime(updateTime);
hmsTable.setUpdateTime(updateTime);
}

public void dropExternalPartitions(String catalogName, String dbName, String tableName,
Expand Down Expand Up @@ -757,7 +757,7 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab

HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames);
hmsTable.setEventUpdateTime(updateTime);
hmsTable.setUpdateTime(updateTime);
}

public void registerCatalogRefreshListener(Env env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,9 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa
partitions = partitionNamesInfo.getPartitionNames();
}
ExternalTable dorisTable = getDbOrDdlException(dbName).getTableOrDdlException(tableName);
metadataOps.truncateTable(dorisTable, partitions);
TruncateTableInfo info = new TruncateTableInfo(getName(), dbName, tableName, partitions);
long updateTime = System.currentTimeMillis();
metadataOps.truncateTable(dorisTable, partitions, updateTime);
TruncateTableInfo info = new TruncateTableInfo(getName(), dbName, tableName, partitions, updateTime);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (Exception e) {
LOG.warn("Failed to truncate table {}.{} in catalog {}", dbName, tableName, getName(), e);
Expand All @@ -1148,7 +1149,7 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa

public void replayTruncateTable(TruncateTableInfo info) {
if (metadataOps != null) {
metadataOps.afterTruncateTable(info.getDb(), info.getTable());
metadataOps.afterTruncateTable(info.getDb(), info.getTable(), info.getUpdateTime());
}
}

Expand Down Expand Up @@ -1320,11 +1321,11 @@ public void resetMetaCacheNames() {
}

// log the refresh external table operation
private void logRefreshExternalTable(ExternalTable dorisTable) {
private void logRefreshExternalTable(ExternalTable dorisTable, long updateTime) {
Env.getCurrentEnv().getEditLog()
.logRefreshExternalTable(
ExternalObjectLog.createForRefreshTable(dorisTable.getCatalog().getId(),
dorisTable.getDbName(), dorisTable.getName()));
dorisTable.getDbName(), dorisTable.getName(), updateTime));
}

@Override
Expand All @@ -1336,8 +1337,9 @@ public void addColumn(TableIf dorisTable, Column column, ColumnPosition position
throw new DdlException("Add column operation is not supported for catalog: " + getName());
}
try {
metadataOps.addColumn(externalTable, column, position);
logRefreshExternalTable(externalTable);
long updateTime = System.currentTimeMillis();
metadataOps.addColumn(externalTable, column, position, updateTime);
logRefreshExternalTable(externalTable, updateTime);
} catch (Exception e) {
LOG.warn("Failed to add column {} to table {}.{} in catalog {}",
column.getName(), externalTable.getDbName(), externalTable.getName(), getName(), e);
Expand All @@ -1354,8 +1356,9 @@ public void addColumns(TableIf dorisTable, List<Column> columns) throws UserExce
throw new DdlException("Add columns operation is not supported for catalog: " + getName());
}
try {
metadataOps.addColumns(externalTable, columns);
logRefreshExternalTable(externalTable);
long updateTime = System.currentTimeMillis();
metadataOps.addColumns(externalTable, columns, updateTime);
logRefreshExternalTable(externalTable, updateTime);
} catch (Exception e) {
LOG.warn("Failed to add columns to table {}.{} in catalog {}",
externalTable.getDbName(), externalTable.getName(), getName(), e);
Expand All @@ -1372,8 +1375,9 @@ public void dropColumn(TableIf dorisTable, String columnName) throws UserExcepti
throw new DdlException("Drop column operation is not supported for catalog: " + getName());
}
try {
metadataOps.dropColumn(externalTable, columnName);
logRefreshExternalTable(externalTable);
long updateTime = System.currentTimeMillis();
metadataOps.dropColumn(externalTable, columnName, updateTime);
logRefreshExternalTable(externalTable, updateTime);
} catch (Exception e) {
LOG.warn("Failed to drop column {} from table {}.{} in catalog {}",
columnName, externalTable.getDbName(), externalTable.getName(), getName(), e);
Expand All @@ -1390,8 +1394,9 @@ public void renameColumn(TableIf dorisTable, String oldName, String newName) thr
throw new DdlException("Rename column operation is not supported for catalog: " + getName());
}
try {
metadataOps.renameColumn(externalTable, oldName, newName);
logRefreshExternalTable(externalTable);
long updateTime = System.currentTimeMillis();
metadataOps.renameColumn(externalTable, oldName, newName, updateTime);
logRefreshExternalTable(externalTable, updateTime);
} catch (Exception e) {
LOG.warn("Failed to rename column {} to {} in table {}.{} in catalog {}",
oldName, newName, externalTable.getDbName(), externalTable.getName(), getName(), e);
Expand All @@ -1408,8 +1413,9 @@ public void modifyColumn(TableIf dorisTable, Column column, ColumnPosition colum
throw new DdlException("Modify column operation is not supported for catalog: " + getName());
}
try {
metadataOps.modifyColumn(externalTable, column, columnPosition);
logRefreshExternalTable(externalTable);
long updateTime = System.currentTimeMillis();
metadataOps.modifyColumn(externalTable, column, columnPosition, updateTime);
logRefreshExternalTable(externalTable, updateTime);
} catch (Exception e) {
LOG.warn("Failed to modify column {} in table {}.{} in catalog {}",
column.getName(), externalTable.getDbName(), externalTable.getName(), getName(), e);
Expand All @@ -1426,8 +1432,9 @@ public void reorderColumns(TableIf dorisTable, List<String> newOrder) throws Use
throw new DdlException("Reorder columns operation is not supported for catalog: " + getName());
}
try {
metadataOps.reorderColumns(externalTable, newOrder);
logRefreshExternalTable(externalTable);
long updateTime = System.currentTimeMillis();
metadataOps.reorderColumns(externalTable, newOrder, updateTime);
logRefreshExternalTable(externalTable, updateTime);
} catch (Exception e) {
LOG.warn("Failed to reorder columns in table {}.{} in catalog {}",
externalTable.getDbName(), externalTable.getName(), getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,25 @@ public static ExternalObjectLog createForRefreshDb(long catalogId, String dbName
return externalObjectLog;
}

public static ExternalObjectLog createForRefreshTable(long catalogId, String dbName, String tblName) {
public static ExternalObjectLog createForRefreshTable(long catalogId, String dbName, String tblName,
long updateTime) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
externalObjectLog.setCatalogId(catalogId);
externalObjectLog.setDbName(dbName);
externalObjectLog.setTableName(tblName);
externalObjectLog.setLastUpdateTime(updateTime);
return externalObjectLog;
}

public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName,
List<String> modifiedPartNames, List<String> newPartNames) {
List<String> modifiedPartNames, List<String> newPartNames, long updateTime) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
externalObjectLog.setCatalogId(catalogId);
externalObjectLog.setDbName(dbName);
externalObjectLog.setTableName(tblName);
externalObjectLog.setPartitionNames(modifiedPartNames);
externalObjectLog.setNewPartitionNames(newPartNames);
externalObjectLog.setLastUpdateTime(updateTime);
return externalObjectLog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@SerializedName(value = "ta")
private final TableAttributes tableAttributes = new TableAttributes();

// this field will be refreshed after reloading schema
protected volatile long schemaUpdateTime;
// record the table update time, like insert/alter/delete
protected volatile long updateTime = 0;

protected long dbId;
protected boolean objectCreated;
Expand Down Expand Up @@ -276,16 +276,15 @@ public long getCreateTime() {
return 0;
}

// return schema update time as default
// override this method if there is some other kinds of update time
// use getSchemaUpdateTime if just need the schema update time
@Override
// Returns the table update time, tracking when the table was last modified
// (for example, by insert, alter, or refresh operations).
public long getUpdateTime() {
return this.schemaUpdateTime;
return updateTime;
}

public void setUpdateTime(long schemaUpdateTime) {
this.schemaUpdateTime = schemaUpdateTime;
public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}

@Override
Expand Down Expand Up @@ -345,7 +344,7 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
* @return
*/
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
schemaUpdateTime = System.currentTimeMillis();
setUpdateTime(System.currentTimeMillis());
return initSchema(key);
}

Expand Down Expand Up @@ -484,10 +483,6 @@ public int hashCode() {
return Objects.hashCode(name, db);
}

public long getSchemaUpdateTime() {
return schemaUpdateTime;
}

public long getDbId() {
return dbId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public boolean registerTable(TableIf tableIf) {
super.registerTable(tableIf);
HMSExternalTable table = getTableNullable(tableIf.getName());
if (table != null) {
table.setEventUpdateTime(tableIf.getUpdateTime());
table.setUpdateTime(tableIf.getUpdateTime());
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI

private HMSDlaTable dlaTable;

// record the event update time when enable hms event listener
protected volatile long eventUpdateTime;

public enum DLAType {
UNKNOWN, HIVE, HUDI, ICEBERG
}
Expand Down Expand Up @@ -648,11 +645,11 @@ public Set<String> getPartitionNames() {
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
Table table = loadHiveTable();
// try to use transient_lastDdlTime from hms client
schemaUpdateTime = MapUtils.isNotEmpty(table.getParameters())
setUpdateTime(MapUtils.isNotEmpty(table.getParameters())
&& table.getParameters().containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)
? Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000
// use current timestamp if lastDdlTime does not exist (hive views don't have this prop)
: System.currentTimeMillis();
: System.currentTimeMillis());
return initSchema(key);
}

Expand Down Expand Up @@ -902,17 +899,6 @@ private void setStatData(Column col, ColumnStatisticsData data, ColumnStatisticB
builder.setMaxValue(Double.POSITIVE_INFINITY);
}

public void setEventUpdateTime(long updateTime) {
this.eventUpdateTime = updateTime;
}

@Override
// get the max value of `schemaUpdateTime` and `eventUpdateTime`
// eventUpdateTime will be refreshed after processing events with hms event listener enabled
public long getUpdateTime() {
return Math.max(this.schemaUpdateTime, this.eventUpdateTime);
}

@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,15 @@ public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions)
}

@Override
public void afterTruncateTable(String dbName, String tblName) {
public void afterTruncateTable(String dbName, String tblName, long updateTime) {
try {
// Invalidate cache.
Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
if (db.isPresent()) {
Optional tbl = db.get().getTableForReplay(tblName);
if (tbl.isPresent()) {
Env.getCurrentEnv().getRefreshManager()
.refreshTableInternal(db.get(), (ExternalTable) tbl.get(), 0);
.refreshTableInternal(db.get(), (ExternalTable) tbl.get(), updateTime);
}
}
} catch (Exception e) {
Expand Down
Loading