From e34a4bb26ba0489e4db491f4bec9f0b1d36e74f5 Mon Sep 17 00:00:00 2001 From: Forus0322 Date: Fri, 26 Dec 2025 18:49:25 +0800 Subject: [PATCH 1/2] [core] Add compaction metrics system table. #6880 --- .../java/org/apache/paimon/CoreOptions.java | 22 ++ .../apache/paimon/metrics/CompactMetric.java | 233 ++++++++++++++ .../org/apache/paimon/AbstractFileStore.java | 9 + .../java/org/apache/paimon/FileStore.java | 3 + .../paimon/append/AppendCompactTask.java | 9 +- .../paimon/append/AppendOnlyWriter.java | 8 +- .../append/BucketedAppendCompactManager.java | 4 +- .../paimon/compact/CompactMetricMeta.java | 64 ++++ .../CompactMetricMetaV1Deserializer.java | 74 +++++ .../compact/CompactMetricSerializer.java | 49 +++ .../apache/paimon/compact/CompactResult.java | 20 ++ .../apache/paimon/compact/CompactTask.java | 13 +- .../paimon/io/CompactMetricIncrement.java | 78 +++++ .../paimon/mergetree/MergeTreeWriter.java | 9 +- .../compact/FileRewriteCompactTask.java | 5 +- .../compact/MergeTreeCompactManager.java | 16 +- .../compact/MergeTreeCompactTask.java | 5 +- .../operation/AbstractFileStoreWrite.java | 5 +- .../paimon/operation/FileStoreCommitImpl.java | 70 ++++- .../commit/ManifestEntryChanges.java | 19 ++ .../paimon/privilege/PrivilegedFileStore.java | 7 + .../paimon/table/AbstractFileStoreTable.java | 8 + .../paimon/table/ExpireCompactMetrics.java | 26 ++ .../table/ExpireCompactMetricsImpl.java | 59 ++++ .../paimon/table/sink/CommitMessageImpl.java | 32 +- .../table/sink/CommitMessageSerializer.java | 38 ++- .../table/system/CompactionMetricsTable.java | 290 ++++++++++++++++++ .../table/system/SystemTableLoader.java | 2 + .../apache/paimon/utils/CommitIncrement.java | 18 +- .../paimon/utils/CompactMetricsManager.java | 271 ++++++++++++++++ 30 files changed, 1426 insertions(+), 40 deletions(-) create mode 100644 paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetrics.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 20aa5182d6e4..5dc3b6734cd6 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -805,6 +805,20 @@ public InlineElement getDescription() { "Ratio of the deleted rows in a data file to be forced compacted for " + "append-only table."); + public static final ConfigOption COMPACTION_METRICS_ENABLED = + key("compaction.metrics.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "If enabled, compaction metric data will be recorded for each instance."); + + public static final ConfigOption COMPACTION_METRIC_RETAINED_NUM = + key("compaction.metrics.retained_num") + .intType() + .defaultValue(10) + .withDescription( + "Set the maximum number of times a record metric can be retained."); + public static final ConfigOption CHANGELOG_PRODUCER = key("changelog-producer") .enumType(ChangelogProducer.class) @@ -2719,6 +2733,14 @@ public double compactionDeleteRatioThreshold() { return options.get(COMPACTION_DELETE_RATIO_THRESHOLD); } + public boolean compactMetricsEnabled() { + return options.get(COMPACTION_METRICS_ENABLED); + } + + public int compactMetricsRetainedNum() { + return options.get(COMPACTION_METRIC_RETAINED_NUM); + } + public long dynamicBucketTargetRowNum() { return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM); } diff --git a/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java b/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java new file mode 100644 index 000000000000..903c7a4bde15 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.utils.JsonSerdeUtil; + +import javax.annotation.Nullable; +import java.io.Serializable; +import java.util.Objects; + +/** + * @since 0.9.0 + */ +@Public +@JsonIgnoreProperties(ignoreUnknown = true) +public class CompactMetric implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final long FIRST_SNAPSHOT_ID = 1; + + public static final int TABLE_STORE_02_VERSION = 1; + protected static final int CURRENT_VERSION = 3; + + protected static final String FIELD_VERSION = "version"; + protected static final String FIELD_SNAPSHOT_ID = "snapshotId"; + protected static final String FIELD_COMMIT_TIME = "commitTime"; + protected static final String FIELD_COMPACT_AVG_DURATION = "compactAvgDuration"; + protected static final String FIELD_COMPACTION_MAX_DURATION = "compactMaxDuration"; + protected static final String FIELD_COMPACTION_MIN_DURATION = "compactMinDuration"; + protected static final String FIELD_BUCKETS = "compactBuckets"; + protected static final String FIELD_COMPACTION_TYPE = "compactType"; + protected static final String FIELD_IDENTIFIER = "identifier"; + protected static final String FIELD_COMMIT_USER = "commitUser"; + + @JsonProperty(FIELD_VERSION) + @Nullable + protected final Integer version; + + @JsonProperty(FIELD_SNAPSHOT_ID) + protected final long snapshotId; + + @JsonProperty(FIELD_COMMIT_TIME) + protected final long commitTime; + + @JsonProperty(FIELD_COMPACT_AVG_DURATION) + protected final long compactDuration; + + // a manifest list recording all new changes occurred in this snapshot + // for faster expire and streaming reads + @JsonProperty(FIELD_COMPACTION_MAX_DURATION) + protected final long compactMaxDuration; + + @JsonProperty(FIELD_COMPACTION_MIN_DURATION) + protected final long compactMinDuration; + + // a manifest list recording all changelog produced in this snapshot + // null if no changelog is produced, or for paimon <= 0.2 + @JsonProperty(FIELD_BUCKETS) + protected final String buckets; + + @JsonProperty(FIELD_IDENTIFIER) + protected final long identifier; + + @JsonProperty(FIELD_COMMIT_USER) + protected final String commitUser; + + @JsonProperty(FIELD_COMPACTION_TYPE) + protected final String compactType; + + public CompactMetric( + long snapshotId, + long commitTime, + long compactDuration, + long compactMaxDuration, + long compactMinDuration, + String buckets, + String compactType, + long identifier, + String commitUser) { + this( + CURRENT_VERSION, + snapshotId, + commitTime, + compactDuration, + compactMaxDuration, + compactMinDuration, + buckets, + compactType, + identifier, + commitUser); + } + + @JsonCreator + public CompactMetric( + @JsonProperty(FIELD_VERSION) @Nullable Integer version, + @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId, + @JsonProperty(FIELD_COMMIT_TIME) long commitTime, + @JsonProperty(FIELD_COMPACT_AVG_DURATION) long compactDuration, + @JsonProperty(FIELD_COMPACTION_MAX_DURATION) long compactMaxDuration, + @JsonProperty(FIELD_COMPACTION_MIN_DURATION) long compactMinDuration, + @JsonProperty(FIELD_BUCKETS) @Nullable String buckets, + @JsonProperty(FIELD_COMPACTION_TYPE) @Nullable String compactType, + @JsonProperty(FIELD_IDENTIFIER) long identifier, + @JsonProperty(FIELD_COMMIT_USER) String commitUser) { + this.version = version; + this.snapshotId = snapshotId; + this.commitTime = commitTime; + this.compactDuration = compactDuration; + this.compactMaxDuration = compactMaxDuration; + this.compactMinDuration = compactMinDuration; + this.buckets = buckets; + this.compactType = compactType; + this.identifier = identifier; + this.commitUser = commitUser; + } + + @JsonGetter(FIELD_VERSION) + public int version() { + // there is no version field for paimon <= 0.2 + return version == null ? TABLE_STORE_02_VERSION : version; + } + + @JsonGetter(FIELD_SNAPSHOT_ID) + public long snapshotId() { + return snapshotId; + } + + @JsonGetter(FIELD_COMMIT_TIME) + public long commitTime() { + return commitTime; + } + + @JsonGetter(FIELD_COMPACT_AVG_DURATION) + public long compactDuration() { + return compactDuration; + } + + @JsonGetter(FIELD_COMPACTION_MAX_DURATION) + public long compactMaxDuration() { + return compactMaxDuration; + } + + @JsonGetter(FIELD_COMPACTION_MIN_DURATION) + public long compactMinDuration() { + return compactMinDuration; + } + + @JsonGetter(FIELD_BUCKETS) + public String buckets() { + return buckets == null ? "{}" : buckets; + } + + @JsonGetter(FIELD_COMPACTION_TYPE) + public String compactType() { + return compactType; + } + + @JsonGetter(FIELD_IDENTIFIER) + public long identifier() { + return identifier; + } + + @JsonGetter(FIELD_COMMIT_USER) + public String commitUser() { + return commitUser; + } + + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + @Override + public int hashCode() { + return Objects.hash( + version, + snapshotId, + commitTime, + compactDuration, + compactMaxDuration, + compactMinDuration, + buckets, + compactType, + identifier, + commitUser); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactMetric that = (CompactMetric) o; + return Objects.equals(version, that.version) + && snapshotId == that.snapshotId + && commitTime == that.commitTime + && Objects.equals(compactDuration, that.compactDuration) + && Objects.equals(compactMaxDuration, that.compactMaxDuration) + && Objects.equals(compactMinDuration, that.compactMinDuration) + && Objects.equals(buckets, that.buckets) + && Objects.equals(compactType, that.compactType) + && Objects.equals(identifier, that.identifier) + && Objects.equals(commitUser, that.commitUser); + } + + public static CompactMetric fromJson(String json) { + return JsonSerdeUtil.fromJson(json, CompactMetric.class); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 3c25c39c7013..bbe31b5200e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -61,6 +61,7 @@ import org.apache.paimon.tag.TagPreview; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -292,6 +293,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { options.partitionDefaultName(), pathFactory(), snapshotManager, + compactMetricsManager(), manifestFileFactory(), manifestListFactory(), indexManifestFileFactory(), @@ -507,4 +509,11 @@ public GlobalIndexScanBuilder newGlobalIndexScanBuilder() { snapshotManager(), newIndexFileHandler()); } + + @Override + public CompactMetricsManager compactMetricsManager() { + return new CompactMetricsManager(fileIO, + options.path(), + options.branch()); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 1ccd19ca1143..9fb80192e9e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -46,6 +46,7 @@ import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -127,4 +128,6 @@ PartitionExpire newPartitionExpire( void setSnapshotCache(Cache cache); GlobalIndexScanBuilder newGlobalIndexScanBuilder(); + + CompactMetricsManager compactMetricsManager(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java index 42c45488424e..32ebd59b4541 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java @@ -18,11 +18,13 @@ package org.apache.paimon.append; +import org.apache.paimon.compact.CompactMetricMeta; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.FileKind; @@ -68,6 +70,7 @@ public List compactAfter() { public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite write) throws Exception { + long startMillis = System.currentTimeMillis(); boolean dvEnabled = table.coreOptions().deletionVectorsEnabled(); Preconditions.checkArgument( dvEnabled || compactBefore.size() > 1, @@ -111,6 +114,9 @@ public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite wr Collections.emptyList(), newIndexFiles, deletedIndexFiles); + CompactMetricIncrement compactMetricIncrement = + new CompactMetricIncrement( + new CompactMetricMeta("full", System.currentTimeMillis() - startMillis)); return new CommitMessageImpl( partition, // bucket 0 is bucket for unaware-bucket table @@ -118,7 +124,8 @@ public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite wr 0, table.coreOptions().bucket(), DataIncrement.emptyIncrement(), - compactIncrement); + compactIncrement, + compactMetricIncrement); } public int hashCode() { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 14e21e2265f8..4934d71be331 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.compact.CompactManager; +import org.apache.paimon.compact.CompactMetricMeta; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; @@ -30,6 +31,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; @@ -96,6 +98,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { @Nullable private CompactDeletionFile compactDeletionFile; private SinkWriter sinkWriter; private MemorySegmentPool memorySegmentPool; + private CompactMetricMeta compactMetricMeta; public AppendOnlyWriter( FileIO fileIO, @@ -338,6 +341,7 @@ private void trySyncLatestCompaction(boolean blocking) result -> { compactBefore.addAll(result.before()); compactAfter.addAll(result.after()); + compactMetricMeta = new CompactMetricMeta(result.compactionType(), result.compactionTimeMillis()); updateCompactDeletionFile(result.deletionFile()); }); } @@ -363,6 +367,8 @@ private CommitIncrement drainIncrement() { new ArrayList<>(compactAfter), Collections.emptyList()); CompactDeletionFile drainDeletionFile = compactDeletionFile; + CompactMetricIncrement compactMetricIncrement = + new CompactMetricIncrement(compactMetricMeta); newFiles.clear(); deletedFiles.clear(); @@ -370,7 +376,7 @@ private CommitIncrement drainIncrement() { compactAfter.clear(); compactDeletionFile = null; - return new CommitIncrement(dataIncrement, compactIncrement, drainDeletionFile); + return new CommitIncrement(dataIncrement, compactIncrement, compactMetricIncrement, drainDeletionFile); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java index df7c57bc7f47..d3de72f70b03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java @@ -254,7 +254,7 @@ public FullCompactTask( boolean forceRewriteAllFiles, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { - super(metricsReporter); + super(metricsReporter, "full"); this.dvMaintainer = dvMaintainer; this.toCompact = new LinkedList<>(inputs); this.targetFileSize = targetFileSize; @@ -323,7 +323,7 @@ public AutoCompactTask( List toCompact, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { - super(metricsReporter); + super(metricsReporter, "minor"); this.dvMaintainer = dvMaintainer; this.toCompact = toCompact; this.rewriter = rewriter; diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java new file mode 100644 index 000000000000..5883e48322d9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.compact; + +import org.apache.paimon.annotation.Public; + +import org.apache.paimon.types.*; + +import java.util.*; + + +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** + * Metadata of compact metric. + * + * @since 1.0.0 + */ +@Public +public class CompactMetricMeta { + + static RowType SCHEMA = + new RowType( + false, + Arrays.asList( + new DataField(0, "_TYPE", newStringType(true)), + new DataField(1, "_DURATION", new BigIntType(true)))); + + private final String type; + private final long duration; + + public CompactMetricMeta() { + this(null, -1L); + } + + public CompactMetricMeta(String type, long duration) { + this.type = type; + this.duration = duration; + } + + public String type() { + return type; + } + + public long duration() { + return duration; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java new file mode 100644 index 000000000000..3a8f98961b93 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.compact; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** Serializer for {@link IndexFileMeta} with 1.4 version. */ +public class CompactMetricMetaV1Deserializer implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final RowType SCHEMA = + new RowType( + true, + Arrays.asList( + new DataField(0, "_TYPE", newStringType(true)), + new DataField(1, "_DURATION", new BigIntType(true)))); + + + protected final InternalRowSerializer rowSerializer; + + public CompactMetricMetaV1Deserializer() { + this.rowSerializer = InternalSerializers.create(SCHEMA); + } + + public CompactMetricMeta fromRow(InternalRow row) { + if (row == null) { + return new CompactMetricMeta(); + } + return new CompactMetricMeta( + row.getString(0).toString(), + row.getLong(1)); + } + + public CompactMetricMeta deserialize(DataInputView in) throws IOException { + if (in instanceof DataInputDeserializer) { + DataInputDeserializer deserializer = (DataInputDeserializer) in; + if (deserializer.available() == 0) { + return new CompactMetricMeta(); + } + } + return fromRow(rowSerializer.deserialize(in)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java new file mode 100644 index 000000000000..2c642faeb02d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.compact; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link DataFileMeta}. */ +public class CompactMetricSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + public CompactMetricSerializer() { + super(CompactMetricMeta.SCHEMA); + } + + @Override + public InternalRow toRow(CompactMetricMeta meta) { + return GenericRow.of( + BinaryString.fromString(meta.type()), + meta.duration()); + } + + @Override + public CompactMetricMeta fromRow(InternalRow row) { + return new CompactMetricMeta( + row.getString(0).toString(), + row.getLong(1)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java index 08d7de5dab7f..2156cc1c7af0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java @@ -35,6 +35,10 @@ public class CompactResult { @Nullable private CompactDeletionFile deletionFile; + @Nullable private String compactType; + + private long compactTimeMillis; + public CompactResult() { this(Collections.emptyList(), Collections.emptyList()); } @@ -75,6 +79,22 @@ public CompactDeletionFile deletionFile() { return deletionFile; } + public String compactionType() { + return compactType; + } + + public void setCompactTimeMillis(long compactTimeMillis) { + this.compactTimeMillis = compactTimeMillis; + } + + public long compactionTimeMillis() { + return compactTimeMillis; + } + + public void setCompactType(@Nullable String compactType) { + this.compactType = compactType; + } + public void merge(CompactResult that) { before.addAll(that.before); after.addAll(that.after); diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index 69e68949c3c9..196deda3fb85 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -37,8 +37,15 @@ public abstract class CompactTask implements Callable { @Nullable private final CompactionMetrics.Reporter metricsReporter; + @Nullable private String type; + public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) { + this(metricsReporter, null); + } + + public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter, @Nullable String type) { this.metricsReporter = metricsReporter; + this.type = type; } @Override @@ -47,12 +54,14 @@ public CompactResult call() throws Exception { try { long startMillis = System.currentTimeMillis(); CompactResult result = doCompact(); + long compactionTime = System.currentTimeMillis() - startMillis; + result.setCompactTimeMillis(compactionTime); + result.setCompactType(type); MetricUtils.safeCall( () -> { if (metricsReporter != null) { - metricsReporter.reportCompactionTime( - System.currentTimeMillis() - startMillis); + metricsReporter.reportCompactionTime(compactionTime); metricsReporter.increaseCompactionsCompletedCount(); metricsReporter.reportCompactionInputSize( result.before().stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java new file mode 100644 index 000000000000..9849bd8fa87c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.compact.CompactMetricMeta; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** Compact metric*/ +public class CompactMetricIncrement { + + @Nullable private final CompactMetricMeta metric; + + public CompactMetricIncrement() { + this(null); + } + + public CompactMetricIncrement(@Nullable CompactMetricMeta metric) { + this.metric = metric; + } + + public CompactMetricMeta metric() { + return metric; + } + + public CompactMetricMeta empty() { + return new CompactMetricMeta(); + } + + public boolean isEmpty() { + return metric == null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CompactMetricIncrement that = (CompactMetricIncrement) o; + return Objects.equals(metric, that.metric); + } + + @Override + public int hashCode() { + return Objects.hash(metric); + } + + @Override + public String toString() { + return String.format( + "CompactMetricIncrement {metric = %s}", metric); + } + + public static CompactMetricIncrement emptyIncrement() { + return new CompactMetricIncrement(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 488b3992dd0b..c5151ef88926 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -23,11 +23,13 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.compact.CompactManager; +import org.apache.paimon.compact.CompactMetricMeta; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -85,6 +87,8 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private long newSequenceNumber; private WriteBuffer writeBuffer; + private CompactMetricMeta compactMetricMeta; + public MergeTreeWriter( boolean writeBufferSpillable, MemorySize maxDiskSize, @@ -288,6 +292,8 @@ private CommitIncrement drainIncrement() { new ArrayList<>(compactBefore.values()), new ArrayList<>(compactAfter), new ArrayList<>(compactChangelog)); + CompactMetricIncrement compactMetricIncrement = + new CompactMetricIncrement(compactMetricMeta); CompactDeletionFile drainDeletionFile = compactDeletionFile; newFiles.clear(); @@ -298,7 +304,7 @@ private CommitIncrement drainIncrement() { compactChangelog.clear(); compactDeletionFile = null; - return new CommitIncrement(dataIncrement, compactIncrement, drainDeletionFile); + return new CommitIncrement(dataIncrement, compactIncrement, compactMetricIncrement, drainDeletionFile); } private void trySyncLatestCompaction(boolean blocking) throws Exception { @@ -326,6 +332,7 @@ private void updateCompactResult(CompactResult result) { } compactAfter.addAll(result.after()); compactChangelog.addAll(result.changelog()); + compactMetricMeta = new CompactMetricMeta(result.compactionType(), result.compactionTimeMillis()); updateCompactDeletionFile(result.deletionFile()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java index 620ea0748a87..3d3518520e34 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java @@ -43,8 +43,9 @@ public FileRewriteCompactTask( CompactRewriter rewriter, CompactUnit unit, boolean dropDelete, - @Nullable CompactionMetrics.Reporter metricsReporter) { - super(metricsReporter); + @Nullable CompactionMetrics.Reporter metricsReporter, + @Nullable String type) { + super(metricsReporter, type); this.rewriter = rewriter; this.outputLevel = unit.outputLevel(); this.files = unit.files(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index f6ea3e137d1d..dd7286d95a86 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -55,6 +55,9 @@ public class MergeTreeCompactManager extends CompactFutureManager { private static final Logger LOG = LoggerFactory.getLogger(MergeTreeCompactManager.class); + private static final String COMPACT_TYPE_FULL = "full"; + private static final String COMPACT_TYPE_MINOR = "minor"; + private final ExecutorService executor; private final Levels levels; private final CompactStrategy strategy; @@ -68,6 +71,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { private final boolean lazyGenDeletionFile; private final boolean needLookup; private final boolean forceRewriteAllFiles; + private String compactType; @Nullable private final RecordLevelExpire recordLevelExpire; @@ -147,6 +151,7 @@ public void triggerCompaction(boolean fullCompaction) { recordLevelExpire, dvMaintainer, forceRewriteAllFiles); + compactType = COMPACT_TYPE_FULL; } else { if (taskFuture != null) { return; @@ -162,6 +167,7 @@ public void triggerCompaction(boolean fullCompaction) { unit.files().size() > 1 || unit.files().get(0).level() != unit.outputLevel()); + compactType = COMPACT_TYPE_MINOR; } optionalUnit.ifPresent( @@ -212,7 +218,12 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) { CompactTask task; if (unit.fileRewrite()) { - task = new FileRewriteCompactTask(rewriter, unit, dropDelete, metricsReporter); + task = new FileRewriteCompactTask( + rewriter, + unit, + dropDelete, + metricsReporter, + compactType); } else { task = new MergeTreeCompactTask( @@ -225,7 +236,8 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) { metricsReporter, compactDfSupplier, recordLevelExpire, - forceRewriteAllFiles); + forceRewriteAllFiles, + compactType); } if (LOG.isDebugEnabled()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index 667a965c621d..dfd142d78bd1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -63,8 +63,9 @@ public MergeTreeCompactTask( @Nullable CompactionMetrics.Reporter metricsReporter, Supplier compactDfSupplier, @Nullable RecordLevelExpire recordLevelExpire, - boolean forceRewriteAllFiles) { - super(metricsReporter); + boolean forceRewriteAllFiles, + String type) { + super(metricsReporter, type); this.minFileSize = minFileSize; this.rewriter = rewriter; this.outputLevel = unit.outputLevel(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index ddf2addf5313..28cbcd2d2148 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -29,6 +29,7 @@ import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.memory.MemoryPoolFactory; @@ -215,6 +216,7 @@ public List prepareCommit(boolean waitCompaction, long commitIden CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); DataIncrement newFilesIncrement = increment.newFilesIncrement(); CompactIncrement compactIncrement = increment.compactIncrement(); + CompactMetricIncrement compactMetricIncrement = increment.compactMetricIncrement(); if (writerContainer.dynamicBucketMaintainer != null) { newFilesIncrement .newIndexFiles() @@ -232,7 +234,8 @@ public List prepareCommit(boolean waitCompaction, long commitIden bucket, writerContainer.totalBuckets, newFilesIncrement, - compactIncrement); + compactIncrement, + compactMetricIncrement); result.add(committable); if (committable.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index e5d4cd2ff20c..5602e1674f85 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -46,6 +46,7 @@ import org.apache.paimon.operation.commit.CommitKindProvider; import org.apache.paimon.operation.commit.CommitResult; import org.apache.paimon.operation.commit.CommitScanner; +import org.apache.paimon.metrics.CompactMetric; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck; import org.apache.paimon.operation.commit.ManifestEntryChanges; @@ -72,6 +73,7 @@ import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.CompactMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +142,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final String partitionDefaultName; private final FileStorePathFactory pathFactory; private final SnapshotManager snapshotManager; + private final CompactMetricsManager compactMetricsManager; private final ManifestFile manifestFile; private final ManifestList manifestList; private final IndexManifestFile indexManifestFile; @@ -180,6 +183,7 @@ public FileStoreCommitImpl( String partitionDefaultName, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, + CompactMetricsManager compactMetricsManager, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, IndexManifestFile.Factory indexManifestFileFactory, @@ -212,6 +216,7 @@ public FileStoreCommitImpl( this.partitionDefaultName = partitionDefaultName; this.pathFactory = pathFactory; this.snapshotManager = snapshotManager; + this.compactMetricsManager = compactMetricsManager; this.manifestFile = manifestFileFactory.create(); this.manifestList = manifestListFactory.create(); this.indexManifestFile = indexManifestFileFactory.create(); @@ -425,7 +430,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { committable.properties(), CommitKindProvider.provider(CommitKind.COMPACT), hasConflictChecked(safeLatestSnapshotId), - null); + null, + changes.buckets, + changes.types, + changes.compactionDurationTime); generatedSnapshot += 1; } } finally { @@ -773,6 +781,21 @@ private int tryCommit( CommitKindProvider commitKindProvider, ConflictCheck conflictCheck, @Nullable String statsFileName) { + return tryCommit(changesProvider, identifier, watermark, logOffsets, properties, commitKindProvider, conflictCheck, statsFileName, null, null, null); + } + + private int tryCommit( + CommitChangesProvider changesProvider, + long identifier, + @Nullable Long watermark, + Map logOffsets, + Map properties, + CommitKindProvider commitKindProvider, + ConflictCheck conflictCheck, + @Nullable String statsFileName, + @Nullable List buckets, + @Nullable List compactTypes, + @Nullable List compactDurationTime) { int retryCount = 0; RetryCommitResult retryResult = null; long startMillis = System.currentTimeMillis(); @@ -793,7 +816,10 @@ private int tryCommit( commitKind, latestSnapshot, conflictCheck, - statsFileName); + statsFileName, + buckets, + compactTypes, + compactDurationTime); if (result.isSuccess()) { break; @@ -863,6 +889,26 @@ CommitResult tryCommitOnce( @Nullable Snapshot latestSnapshot, ConflictCheck conflictCheck, @Nullable String newStatsFileName) { + return tryCommitOnce(retryResult, deltaFiles, changelogFiles, indexFiles, identifier, watermark, logOffsets, properties, commitKind, latestSnapshot, conflictCheck, newStatsFileName, null, null, null); + } + + @VisibleForTesting + CommitResult tryCommitOnce( + @Nullable RetryCommitResult retryResult, + List deltaFiles, + List changelogFiles, + List indexFiles, + long identifier, + @Nullable Long watermark, + Map logOffsets, + Map properties, + CommitKind commitKind, + @Nullable Snapshot latestSnapshot, + ConflictCheck conflictCheck, + @Nullable String newStatsFileName, + @Nullable List buckets, + @Nullable List compactTypes, + @Nullable List compactDurationTime) { long startMillis = System.currentTimeMillis(); // Check if the commit has been completed. At this point, there will be no more repeated @@ -953,6 +999,7 @@ CommitResult tryCommitOnce( } Snapshot newSnapshot; + CompactMetric compactMetric = null; Pair baseManifestList = null; Pair deltaManifestList = null; List deltaStatistics; @@ -1069,6 +1116,9 @@ CommitResult tryCommitOnce( // if empty properties, just set to null properties.isEmpty() ? null : properties, nextRowIdStart); + if (options.compactMetricsEnabled()) { + compactMetric = buildCompactMetric(newSnapshotId, commitKind, buckets, compactTypes, compactDurationTime, identifier); + } } catch (Throwable e) { // fails when preparing for commit, we should clean up commitCleaner.cleanUpReuseTmpManifests( @@ -1086,6 +1136,9 @@ CommitResult tryCommitOnce( boolean success; try { success = commitSnapshotImpl(newSnapshot, deltaStatistics); + if (options.compactMetricsEnabled() && success && compactMetric != null) { + compactMetricsManager.commit(compactMetric); + } } catch (Exception e) { // commit exception, not sure about the situation and should not clean up the files LOG.warn("Retry commit for exception.", e); @@ -1324,6 +1377,19 @@ private void commitRetryWait(int retryCount) { } } + private CompactMetric buildCompactMetric(long newSnapshotId, CommitKind commitKind, List buckets, List compactTypes, List compactDurationTime, long identifier) { + if (commitKind.equals(CommitKind.COMPACT) && compactDurationTime !=null && !compactDurationTime.isEmpty()) { + long maxDuration = compactDurationTime.stream().max(Long::compareTo).get(); + long minDuration = compactDurationTime.stream().min(Long::compareTo).get(); + long averageDuration = (long) compactDurationTime.stream().mapToLong(Long::longValue).average().getAsDouble(); + String compParts = buckets == null ? "{}" : String.join(",", buckets); + String compTypes = compactTypes == null ? "" : String.join(",", new HashSet<>(compactTypes)); + return new CompactMetric(newSnapshotId, System.currentTimeMillis(), averageDuration, maxDuration, minDuration, compParts, compTypes, identifier, commitUser); + } else { + return null; + } + } + @Override public void close() { IOUtils.closeAllQuietly(commitCallbacks); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java index 794445dfbbd7..b160f24f980d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java @@ -44,6 +44,9 @@ public class ManifestEntryChanges { public List compactTableFiles; public List compactChangelog; public List compactIndexFiles; + public List buckets; + public List types; + public List compactionDurationTime; public ManifestEntryChanges(int defaultNumBucket) { this.defaultNumBucket = defaultNumBucket; @@ -53,6 +56,9 @@ public ManifestEntryChanges(int defaultNumBucket) { this.compactTableFiles = new ArrayList<>(); this.compactChangelog = new ArrayList<>(); this.compactIndexFiles = new ArrayList<>(); + this.buckets = new ArrayList<>(); + this.types = new ArrayList<>(); + this.compactionDurationTime = new ArrayList<>(); } public void collect(CommitMessage message) { @@ -126,6 +132,19 @@ public void collect(CommitMessage message) { commitMessage.partition(), commitMessage.bucket(), m))); + if (commitMessage.partition().getFieldCount() == 0) { + buckets.add("bucket-" + commitMessage.bucket()); + } else { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < commitMessage.partition().getFieldCount(); i++) { + builder.append(commitMessage.partition().getString(i)).append("/"); + } + buckets.add(builder + "bucket-" + commitMessage.bucket()); + } + if (commitMessage.compactMetricIncrement() != null && commitMessage.compactMetricIncrement().metric() != null) { + types.add(commitMessage.compactMetricIncrement().metric().type()); + compactionDurationTime.add(commitMessage.compactMetricIncrement().metric().duration()); + } } private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, DataFileMeta file) { diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 917301a2e555..0889671cbcb5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -50,6 +50,7 @@ import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -244,4 +245,10 @@ public void setSnapshotCache(Cache cache) { public GlobalIndexScanBuilder newGlobalIndexScanBuilder() { return wrapped.newGlobalIndexScanBuilder(); } + + @Override + public CompactMetricsManager compactMetricsManager() { + privilegeChecker.assertCanSelectOrInsert(identifier); + return wrapped.compactMetricsManager(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 35949062e307..eef8dfa7f6c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -491,15 +491,23 @@ protected Runnable newExpireRunnable() { if (!options.writeOnly()) { boolean changelogDecoupled = options.changelogLifecycleDecoupled(); + boolean compactMetricsEnabled = options.compactMetricsEnabled(); ExpireConfig expireConfig = options.expireConfig(); ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig); ExpireSnapshots expireSnapshots = newExpireSnapshots().config(expireConfig); + ExpireCompactMetrics expireCompactMetrics = + new ExpireCompactMetricsImpl( + store().compactMetricsManager(), + options.compactMetricsRetainedNum()); snapshotExpire = () -> { expireSnapshots.expire(); if (changelogDecoupled) { expireChangelog.expire(); } + if (compactMetricsEnabled) { + expireCompactMetrics.expire(); + } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetrics.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetrics.java new file mode 100644 index 000000000000..d5af4933cc3f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetrics.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +/** Expire compact metrics. */ +public interface ExpireCompactMetrics { + + /** @return How many compact metrics have been expired. */ + int expire(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java new file mode 100644 index 000000000000..71ed1fc2d46b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.utils.CompactMetricsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** An implementation for {@link ExpireCompactMetrics}. */ +public class ExpireCompactMetricsImpl implements ExpireCompactMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(ExpireCompactMetricsImpl.class); + + private final CompactMetricsManager compactMetricsManager; + private final long retainNum; + + public ExpireCompactMetricsImpl(CompactMetricsManager compactMetricsManager, long retainNum) { + this.compactMetricsManager = compactMetricsManager; + this.retainNum = retainNum; + } + + @Override + public int expire() { + try { + int expireNum = 0; + if (compactMetricsManager.compactMetricCount() > retainNum) { + List metrics = compactMetricsManager.compactMetricIds(); + for (int i = 0; i < metrics.size() - retainNum; i++) { + compactMetricsManager.deleteCompactMetric(metrics.get(i)); + } + expireNum = (int) (metrics.size() - retainNum); + } + LOG.debug("Expire compact metrics size {}", expireNum); + return expireNum; + } catch (Exception e) { + LOG.error("Expire compact metrics error", e); + return -1; + } + } + +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java index 8e715462f582..16698d6eafaf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java @@ -19,10 +19,7 @@ package org.apache.paimon.table.sink; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.io.CompactIncrement; -import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.DataInputViewStreamWrapper; -import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.io.*; import javax.annotation.Nullable; @@ -47,6 +44,7 @@ public class CommitMessageImpl implements CommitMessage { private transient @Nullable Integer totalBuckets; private transient DataIncrement dataIncrement; private transient CompactIncrement compactIncrement; + private transient @Nullable CompactMetricIncrement compactMetricIncrement; public CommitMessageImpl( BinaryRow partition, @@ -54,11 +52,22 @@ public CommitMessageImpl( @Nullable Integer totalBuckets, DataIncrement dataIncrement, CompactIncrement compactIncrement) { + this(partition, bucket, totalBuckets, dataIncrement, compactIncrement, null); + } + + public CommitMessageImpl( + BinaryRow partition, + int bucket, + @Nullable Integer totalBuckets, + DataIncrement dataIncrement, + CompactIncrement compactIncrement, + @Nullable CompactMetricIncrement compactMetricIncrement) { this.partition = partition; this.bucket = bucket; this.totalBuckets = totalBuckets; this.dataIncrement = dataIncrement; this.compactIncrement = compactIncrement; + this.compactMetricIncrement = compactMetricIncrement; } @Override @@ -84,6 +93,10 @@ public CompactIncrement compactIncrement() { return compactIncrement; } + public CompactMetricIncrement compactMetricIncrement() { + return compactMetricIncrement; + } + public boolean isEmpty() { return dataIncrement.isEmpty() && compactIncrement.isEmpty(); } @@ -105,6 +118,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE this.totalBuckets = message.totalBuckets; this.dataIncrement = message.dataIncrement; this.compactIncrement = message.compactIncrement; + this.compactMetricIncrement = message.compactMetricIncrement; } @Override @@ -121,12 +135,13 @@ public boolean equals(Object o) { && Objects.equals(partition, that.partition) && Objects.equals(totalBuckets, that.totalBuckets) && Objects.equals(dataIncrement, that.dataIncrement) - && Objects.equals(compactIncrement, that.compactIncrement); + && Objects.equals(compactIncrement, that.compactIncrement) + && Objects.equals(compactMetricIncrement, that.compactMetricIncrement); } @Override public int hashCode() { - return Objects.hash(partition, bucket, totalBuckets, dataIncrement, compactIncrement); + return Objects.hash(partition, bucket, totalBuckets, dataIncrement, compactIncrement, compactMetricIncrement); } @Override @@ -137,7 +152,8 @@ public String toString() { + "bucket = %d, " + "totalBuckets = %s, " + "newFilesIncrement = %s, " - + "compactIncrement = %s}", - partition, bucket, totalBuckets, dataIncrement, compactIncrement); + + "compactIncrement = %s, " + + "compactMetricIncrement = $s}", + partition, bucket, totalBuckets, dataIncrement, compactIncrement, compactMetricIncrement); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 7ead4e9fdee8..d180e891ba45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -18,6 +18,9 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.compact.CompactMetricMeta; +import org.apache.paimon.compact.CompactMetricMetaV1Deserializer; +import org.apache.paimon.compact.CompactMetricSerializer; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.serializer.VersionedSerializer; import org.apache.paimon.index.IndexFileMeta; @@ -25,19 +28,7 @@ import org.apache.paimon.index.IndexFileMetaV1Deserializer; import org.apache.paimon.index.IndexFileMetaV2Deserializer; import org.apache.paimon.index.IndexFileMetaV3Deserializer; -import org.apache.paimon.io.CompactIncrement; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.DataFileMeta08Serializer; -import org.apache.paimon.io.DataFileMeta09Serializer; -import org.apache.paimon.io.DataFileMeta10LegacySerializer; -import org.apache.paimon.io.DataFileMeta12LegacySerializer; -import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer; -import org.apache.paimon.io.DataFileMetaSerializer; -import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.DataInputDeserializer; -import org.apache.paimon.io.DataInputView; -import org.apache.paimon.io.DataOutputView; -import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.io.*; import org.apache.paimon.utils.IOExceptionSupplier; import java.io.ByteArrayOutputStream; @@ -55,6 +46,7 @@ public class CommitMessageSerializer implements VersionedSerializer> fileDeserializer = fileDeserializer(version, view); IOExceptionSupplier> indexEntryDeserializer = indexEntryDeserializer(version, view); + IOExceptionSupplier compactMetricDeserializer = + compactMetricDeserializer(version, view); if (version >= 10) { return new CommitMessageImpl( deserializeBinaryRow(view), @@ -154,7 +156,8 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce fileDeserializer.get(), fileDeserializer.get(), indexEntryDeserializer.get(), - indexEntryDeserializer.get())); + indexEntryDeserializer.get()), + new CompactMetricIncrement(compactMetricDeserializer.get())); } else { BinaryRow partition = deserializeBinaryRow(view); int bucket = view.readInt(); @@ -178,7 +181,7 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce } } return new CommitMessageImpl( - partition, bucket, totalBuckets, dataIncrement, compactIncrement); + partition, bucket, totalBuckets, dataIncrement, compactIncrement, new CompactMetricIncrement(compactMetricDeserializer.get())); } } @@ -236,4 +239,9 @@ private IOExceptionSupplier> indexEntryDeserializer( return () -> indexEntryV1Deserializer.deserializeList(view); } } + + private IOExceptionSupplier compactMetricDeserializer( + int version, DataInputView view) { + return () -> new CompactMetricMetaV1Deserializer().deserialize(view); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java new file mode 100644 index 000000000000..4e73e1385810 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.metrics.CompactMetric; +import org.apache.paimon.predicate.*; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.ReadonlyTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.*; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.*; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; + +import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; + +/** A {@link Table} for showing committing compaction metrics of table. */ +public class CompactionMetricsTable implements ReadonlyTable { + + private static final long serialVersionUID = 1L; + + public static final String COMPACTS = "compacts"; + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "snapshot_id", new BigIntType(false)), + new DataField(1, "commit_time", new TimestampType(false, 3)), + new DataField(2, "compact_duration", new BigIntType(false)), + new DataField( + 3, "max_duration", new BigIntType(false)), + new DataField(4, "min_duration", new BigIntType(false)), + new DataField( + 5, + "partitions", + SerializationUtils.newStringType(false)), + new DataField( + 6, + "compact_type", + SerializationUtils.newStringType(false)), + new DataField(7, "identifier", new BigIntType(false)), + new DataField(8, "commit_user", SerializationUtils.newStringType(false)))); + + private final FileIO fileIO; + private final Path location; + + private final FileStoreTable dataTable; + + public CompactionMetricsTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.dataTable = dataTable; + } + + @Override + public String name() { + return location.getName() + SYSTEM_TABLE_SPLITTER + COMPACTS; + } + + @Override + public RowType rowType() { + return TABLE_TYPE; + } + + @Override + public List primaryKeys() { + return Collections.singletonList("snapshot_id"); + } + + @Override + public FileIO fileIO() { + return dataTable.fileIO(); + } + + @Override + public InnerTableScan newScan() { + return new SnapshotsScan(); + } + + @Override + public InnerTableRead newRead() { + return new CompactionsRead(fileIO); + } + + @Override + public Table copy(Map dynamicOptions) { + return new CompactionMetricsTable(dataTable.copy(dynamicOptions)); + } + + private class SnapshotsScan extends ReadOnceTableScan { + + @Override + public InnerTableScan withFilter(Predicate predicate) { + // do filter in read + return this; + } + + @Override + public Plan innerPlan() { + return () -> Collections.singletonList(new CompactionsSplit(location)); + } + } + + private static class CompactionsSplit extends SingletonSplit { + + private static final long serialVersionUID = 1L; + + private final Path location; + + private CompactionsSplit(Path location) { + this.location = location; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionsSplit that = (CompactionsSplit) o; + return Objects.equals(location, that.location); + } + + @Override + public int hashCode() { + return Objects.hash(location); + } + } + + private class CompactionsRead implements InnerTableRead { + + private final FileIO fileIO; + private RowType readType; + private Optional optionalFilterSnapshotIdMax = Optional.empty(); + private Optional optionalFilterSnapshotIdMin = Optional.empty(); + private final List snapshotIds = new ArrayList<>(); + + public CompactionsRead(FileIO fileIO) { + this.fileIO = fileIO; + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + + String leafName = "snapshot_id"; + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + List children = compoundPredicate.children(); + if ((compoundPredicate.function()) instanceof And) { + for (Predicate leaf : children) { + handleLeafPredicate(leaf, leafName); + } + } + + // optimize for IN filter + if ((compoundPredicate.function()) instanceof Or) { + InPredicateVisitor.extractInElements(predicate, leafName) + .ifPresent( + leafs -> + leafs.forEach( + leaf -> + snapshotIds.add( + Long.parseLong( + leaf.toString())))); + } + } else { + handleLeafPredicate(predicate, leafName); + } + + return this; + } + + public void handleLeafPredicate(Predicate predicate, String leafName) { + LeafPredicate compactionPred = + predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName); + if (compactionPred != null) { + if (compactionPred.function() instanceof Equal) { + optionalFilterSnapshotIdMin = + Optional.of((Long) compactionPred.literals().get(0)); + optionalFilterSnapshotIdMax = + Optional.of((Long) compactionPred.literals().get(0)); + } + + if (compactionPred.function() instanceof GreaterThan) { + optionalFilterSnapshotIdMin = + Optional.of((Long) compactionPred.literals().get(0) + 1); + } + + if (compactionPred.function() instanceof GreaterOrEqual) { + optionalFilterSnapshotIdMin = + Optional.of((Long) compactionPred.literals().get(0)); + } + + if (compactionPred.function() instanceof LessThan) { + optionalFilterSnapshotIdMax = + Optional.of((Long) compactionPred.literals().get(0) - 1); + } + + if (compactionPred.function() instanceof LessOrEqual) { + optionalFilterSnapshotIdMax = + Optional.of((Long) compactionPred.literals().get(0)); + } + } + } + + @Override + public InnerTableRead withReadType(RowType readType) { + this.readType = readType; + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + if (!(split instanceof CompactionsSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + dataTable.fileIO(); + CompactMetricsManager compactMetricsManager = dataTable.store().compactMetricsManager(); + Iterator metrics = compactMetricsManager.metrics(); + Iterator rows = Iterators.transform(metrics, metric -> metric != null ? toRow(metric) : null); + if (readType != null) { + rows = + Iterators.transform( + rows, + row -> + ProjectedRow.from(readType, CompactionMetricsTable.TABLE_TYPE) + .replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } + + private InternalRow toRow(CompactMetric metric) { + return GenericRow.of( + metric.snapshotId(), + Timestamp.fromLocalDateTime( + LocalDateTime.ofInstant( + Instant.ofEpochMilli(metric.commitTime()), + ZoneId.systemDefault())), + metric.compactDuration(), + metric.compactMaxDuration(), + metric.compactMinDuration(), + BinaryString.fromString(metric.buckets()), + BinaryString.fromString(metric.compactType()), + metric.identifier(), + BinaryString.fromString(metric.commitUser())); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index e1411d290f35..1236b1e4a78b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -42,6 +42,7 @@ import static org.apache.paimon.table.system.BucketsTable.BUCKETS; import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; +import static org.apache.paimon.table.system.CompactionMetricsTable.COMPACTS; import static org.apache.paimon.table.system.FilesTable.FILES; import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS; import static org.apache.paimon.table.system.OptionsTable.OPTIONS; @@ -76,6 +77,7 @@ public class SystemTableLoader { .put(BINLOG, BinlogTable::new) .put(TABLE_INDEXES, TableIndexesTable::new) .put(ROW_TRACKING, RowTrackingTable::new) + .put(COMPACTS, CompactionMetricsTable::new) .build(); public static final List SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet()); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java index 3c16378f8f57..046f0cab4065 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java @@ -20,6 +20,7 @@ import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; import org.apache.paimon.io.DataIncrement; import javax.annotation.Nullable; @@ -29,14 +30,24 @@ public class CommitIncrement { private final DataIncrement dataIncrement; private final CompactIncrement compactIncrement; + @Nullable private final CompactMetricIncrement compactMetricIncrement; @Nullable private final CompactDeletionFile compactDeletionFile; public CommitIncrement( DataIncrement dataIncrement, CompactIncrement compactIncrement, @Nullable CompactDeletionFile compactDeletionFile) { + this(dataIncrement, compactIncrement, new CompactMetricIncrement(), compactDeletionFile); + } + + public CommitIncrement( + DataIncrement dataIncrement, + CompactIncrement compactIncrement, + @Nullable CompactMetricIncrement compactMetricIncrement, + @Nullable CompactDeletionFile compactDeletionFile) { this.dataIncrement = dataIncrement; this.compactIncrement = compactIncrement; + this.compactMetricIncrement = compactMetricIncrement; this.compactDeletionFile = compactDeletionFile; } @@ -48,6 +59,11 @@ public CompactIncrement compactIncrement() { return compactIncrement; } + @Nullable + public CompactMetricIncrement compactMetricIncrement() { + return compactMetricIncrement; + } + @Nullable public CompactDeletionFile compactDeletionFile() { return compactDeletionFile; @@ -55,6 +71,6 @@ public CompactDeletionFile compactDeletionFile() { @Override public String toString() { - return dataIncrement.toString() + "\n" + compactIncrement + "\n" + compactDeletionFile; + return dataIncrement.toString() + "\n" + compactIncrement + "\n" + compactDeletionFile + "\n" + compactMetricIncrement; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java new file mode 100644 index 000000000000..3a78d9c1729d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.metrics.CompactMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.paimon.utils.BranchManager.branchPath; +import static org.apache.paimon.utils.FileUtils.listVersionedFiles; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; + +/** Manager for {@link CompactMetric}, providing utility methods related to paths and compact metrics hints. */ +public class CompactMetricsManager implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(CompactMetricsManager.class); + + public static final String COMPACTION_METRICS_PREFIX = "compact-"; + + public static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3; + + private final FileIO fileIO; + private final Path tablePath; + private final String branch; + + public CompactMetricsManager( + FileIO fileIO, + Path tablePath, + @Nullable String branchName) { + this.fileIO = fileIO; + this.tablePath = tablePath; + this.branch = BranchManager.normalizeBranch(branchName); + } + + public FileIO fileIO() { + return fileIO; + } + + public Path tablePath() { + return tablePath; + } + + public String branch() { + return branch; + } + + public Path compactMetricPath(long snapshotId) { + return new Path( + branchPath(tablePath, branch) + "/metrics/" + COMPACTION_METRICS_PREFIX + snapshotId); + } + + public Path compactMetricsDirectory() { + return new Path(branchPath(tablePath, branch) + "/metrics"); + } + + public CompactMetric metric(long snapshotId) { + Path path = compactMetricPath(snapshotId); + return fromPath(fileIO, path); + } + + public boolean commit(CompactMetric metric) throws Exception { + Path metricPath = compactMetricPath(metric.snapshotId()); + return fileIO.tryToWriteAtomic(metricPath, metric.toJson()); + } + + public CompactMetric tryGetCompactMetric(long snapshotId) throws FileNotFoundException { + Path path = compactMetricPath(snapshotId); + return tryFromPath(fileIO, path); + } + + public boolean compactMetricExists(long snapshotId) { + Path path = compactMetricPath(snapshotId); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + "Failed to determine if snapshot #" + snapshotId + " exists in path " + path, + e); + } + } + + public void deleteCompactMetric(long snapshotId) { + Path path = compactMetricPath(snapshotId); + fileIO().deleteQuietly(path); + } + + public long compactMetricCount() throws IOException { + return metricIdStream().count(); + } + + public List compactMetricIds() throws IOException { + return metricIdStream().filter(Objects::nonNull).sorted().collect(Collectors.toList()); + } + + public Iterator metrics() throws IOException { + return metricIdStream() + .map(this::metric) + .sorted(Comparator.comparingLong(CompactMetric::snapshotId)) + .iterator(); + } + + public List metricsPaths(Predicate predicate) throws IOException { + return metricIdStream() + .filter(predicate) + .map(this::compactMetricPath) + .collect(Collectors.toList()); + } + + public Stream metricIdStream() throws IOException { + return listVersionedFiles(fileIO, compactMetricsDirectory(), COMPACTION_METRICS_PREFIX); + } + + public Iterator metricsWithId(List snapshotIds) { + return snapshotIds.stream() + .map(this::metric) + .sorted(Comparator.comparingLong(CompactMetric::snapshotId)) + .iterator(); + } + + public @Nullable Long earliestSnapshotId() { + try { + return findEarliest(compactMetricsDirectory(), COMPACTION_METRICS_PREFIX, this::compactMetricPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find earliest compaction metrics snapshot id", e); + } + } + + public @Nullable Long latestSnapshotId() { + try { + return findLatest(compactMetricsDirectory(), COMPACTION_METRICS_PREFIX, this::compactMetricPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find latest compaction metrics snapshot id", e); + } + } + + /** + * If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may + * be deleted by other processes, so just skip this snapshot. + */ + public List safelyGetAllmetrics() throws IOException { + List paths = metricIdStream().map(this::compactMetricPath).collect(Collectors.toList()); + + List metrics = Collections.synchronizedList(new ArrayList<>(paths.size())); + collectMetrics( + path -> { + try { + // do not pollution cache + metrics.add(tryFromPath(fileIO, path)); + } catch (FileNotFoundException ignored) { + } + }, + paths); + + return metrics; + } + + private static void collectMetrics(Consumer pathConsumer, List paths) + throws IOException { + ExecutorService executor = + createCachedThreadPool( + Runtime.getRuntime().availableProcessors(), "COMPACTION_METRICS_COLLECTOR"); + + try { + randomlyOnlyExecute(executor, pathConsumer, paths); + } catch (RuntimeException e) { + throw new IOException(e); + } finally { + executor.shutdown(); + } + } + + private @Nullable Long findLatest(Path dir, String prefix, Function file) + throws IOException { + return HintFileUtils.findLatest(fileIO, dir, prefix, file); + } + + private @Nullable Long findEarliest(Path dir, String prefix, Function file) + throws IOException { + return HintFileUtils.findEarliest(fileIO, dir, prefix, file); + } + + public void deleteLatestHint() throws IOException { + HintFileUtils.deleteLatestHint(fileIO, compactMetricsDirectory()); + } + + public void commitLatestHint(long snapshotId) throws IOException { + HintFileUtils.commitLatestHint(fileIO, snapshotId, compactMetricsDirectory()); + } + + public void commitEarliestHint(long snapshotId) throws IOException { + HintFileUtils.commitEarliestHint(fileIO, snapshotId, compactMetricsDirectory()); + } + + public static CompactMetric fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + String errorMessage = + String.format( + "Compaction Metric file %s does not exist. " + + "It might have been expired by other jobs operating on this table. " + + "In this case, you can avoid concurrent modification issues by configuring " + + "write-only = true and use a dedicated compaction job, or configuring " + + "different expiration thresholds for different jobs.", + path); + throw new RuntimeException(errorMessage, e); + } + } + + public static CompactMetric tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + int retryNumber = 0; + Exception exception = null; + while (retryNumber++ < 10) { + String content; + try { + content = fileIO.readFileUtf8(path); + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new RuntimeException("Fails to read snapshot from path " + path, e); + } + + try { + return CompactMetric.fromJson(content); + } catch (Exception e) { + // retry + exception = e; + try { + Thread.sleep(200); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } + } + throw new RuntimeException("Retry fail after 10 times", exception); + } +} From c8eb5a199458855544f1ec8358e9dbb98708de7b Mon Sep 17 00:00:00 2001 From: Forus0322 Date: Fri, 26 Dec 2025 19:50:42 +0800 Subject: [PATCH 2/2] [core] fix code style problems. #6880 --- .../apache/paimon/metrics/CompactMetric.java | 8 +- .../org/apache/paimon/AbstractFileStore.java | 4 +- .../java/org/apache/paimon/FileStore.java | 2 +- .../paimon/append/AppendOnlyWriter.java | 7 +- .../paimon/compact/CompactMetricMeta.java | 12 +-- .../CompactMetricMetaV1Deserializer.java | 5 +- .../compact/CompactMetricSerializer.java | 8 +- .../apache/paimon/compact/CompactTask.java | 3 +- .../paimon/io/CompactMetricIncrement.java | 6 +- .../paimon/mergetree/MergeTreeWriter.java | 6 +- .../compact/MergeTreeCompactManager.java | 9 +-- .../paimon/operation/FileStoreCommitImpl.java | 76 ++++++++++++++++--- .../commit/ManifestEntryChanges.java | 3 +- .../paimon/privilege/PrivilegedFileStore.java | 2 +- .../paimon/table/AbstractFileStoreTable.java | 3 +- .../table/ExpireCompactMetricsImpl.java | 2 +- .../paimon/table/sink/CommitMessageImpl.java | 21 ++++- .../table/sink/CommitMessageSerializer.java | 26 ++++++- .../table/system/CompactionMetricsTable.java | 58 +++++++++----- .../table/system/SystemTableLoader.java | 2 +- .../apache/paimon/utils/CommitIncrement.java | 8 +- .../paimon/utils/CompactMetricsManager.java | 33 +++++--- 22 files changed, 215 insertions(+), 89 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java b/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java index 903c7a4bde15..e2fbf1b90f93 100644 --- a/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java +++ b/paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java @@ -19,19 +19,19 @@ package org.apache.paimon.metrics; import org.apache.paimon.annotation.Public; +import org.apache.paimon.utils.JsonSerdeUtil; + import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.paimon.utils.JsonSerdeUtil; import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Objects; -/** - * @since 0.9.0 - */ +/** @since 1.4.0 */ @Public @JsonIgnoreProperties(ignoreUnknown = true) public class CompactMetric implements Serializable { diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index bbe31b5200e2..691774813cd6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -512,8 +512,6 @@ public GlobalIndexScanBuilder newGlobalIndexScanBuilder() { @Override public CompactMetricsManager compactMetricsManager() { - return new CompactMetricsManager(fileIO, - options.path(), - options.branch()); + return new CompactMetricsManager(fileIO, options.path(), options.branch()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 9fb80192e9e7..652819a0aafc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -41,12 +41,12 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; -import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 4934d71be331..cb61e726cf54 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -341,7 +341,9 @@ private void trySyncLatestCompaction(boolean blocking) result -> { compactBefore.addAll(result.before()); compactAfter.addAll(result.after()); - compactMetricMeta = new CompactMetricMeta(result.compactionType(), result.compactionTimeMillis()); + compactMetricMeta = + new CompactMetricMeta( + result.compactionType(), result.compactionTimeMillis()); updateCompactDeletionFile(result.deletionFile()); }); } @@ -376,7 +378,8 @@ private CommitIncrement drainIncrement() { compactAfter.clear(); compactDeletionFile = null; - return new CommitIncrement(dataIncrement, compactIncrement, compactMetricIncrement, drainDeletionFile); + return new CommitIncrement( + dataIncrement, compactIncrement, compactMetricIncrement, drainDeletionFile); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java index 5883e48322d9..517875f20ffa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMeta.java @@ -19,23 +19,23 @@ package org.apache.paimon.compact; import org.apache.paimon.annotation.Public; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; -import org.apache.paimon.types.*; - -import java.util.*; - +import java.util.Arrays; import static org.apache.paimon.utils.SerializationUtils.newStringType; /** * Metadata of compact metric. * - * @since 1.0.0 + * @since 1.4.0 */ @Public public class CompactMetricMeta { - static RowType SCHEMA = + static final RowType SCHEMA = new RowType( false, Arrays.asList( diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java index 3a8f98961b93..3b84e1f4a8e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricMetaV1Deserializer.java @@ -46,7 +46,6 @@ public class CompactMetricMetaV1Deserializer implements Serializable { new DataField(0, "_TYPE", newStringType(true)), new DataField(1, "_DURATION", new BigIntType(true)))); - protected final InternalRowSerializer rowSerializer; public CompactMetricMetaV1Deserializer() { @@ -57,9 +56,7 @@ public CompactMetricMeta fromRow(InternalRow row) { if (row == null) { return new CompactMetricMeta(); } - return new CompactMetricMeta( - row.getString(0).toString(), - row.getLong(1)); + return new CompactMetricMeta(row.getString(0).toString(), row.getLong(1)); } public CompactMetricMeta deserialize(DataInputView in) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java index 2c642faeb02d..5b73c06d7e26 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactMetricSerializer.java @@ -35,15 +35,11 @@ public CompactMetricSerializer() { @Override public InternalRow toRow(CompactMetricMeta meta) { - return GenericRow.of( - BinaryString.fromString(meta.type()), - meta.duration()); + return GenericRow.of(BinaryString.fromString(meta.type()), meta.duration()); } @Override public CompactMetricMeta fromRow(InternalRow row) { - return new CompactMetricMeta( - row.getString(0).toString(), - row.getLong(1)); + return new CompactMetricMeta(row.getString(0).toString(), row.getLong(1)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index 196deda3fb85..9dbd678ec732 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -43,7 +43,8 @@ public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) { this(metricsReporter, null); } - public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter, @Nullable String type) { + public CompactTask( + @Nullable CompactionMetrics.Reporter metricsReporter, @Nullable String type) { this.metricsReporter = metricsReporter; this.type = type; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java index 9849bd8fa87c..101f54603af5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactMetricIncrement.java @@ -21,9 +21,10 @@ import org.apache.paimon.compact.CompactMetricMeta; import javax.annotation.Nullable; + import java.util.Objects; -/** Compact metric*/ +/** Compact metric. */ public class CompactMetricIncrement { @Nullable private final CompactMetricMeta metric; @@ -68,8 +69,7 @@ public int hashCode() { @Override public String toString() { - return String.format( - "CompactMetricIncrement {metric = %s}", metric); + return String.format("CompactMetricIncrement {metric = %s}", metric); } public static CompactMetricIncrement emptyIncrement() { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index c5151ef88926..e073151c6dea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -304,7 +304,8 @@ private CommitIncrement drainIncrement() { compactChangelog.clear(); compactDeletionFile = null; - return new CommitIncrement(dataIncrement, compactIncrement, compactMetricIncrement, drainDeletionFile); + return new CommitIncrement( + dataIncrement, compactIncrement, compactMetricIncrement, drainDeletionFile); } private void trySyncLatestCompaction(boolean blocking) throws Exception { @@ -332,7 +333,8 @@ private void updateCompactResult(CompactResult result) { } compactAfter.addAll(result.after()); compactChangelog.addAll(result.changelog()); - compactMetricMeta = new CompactMetricMeta(result.compactionType(), result.compactionTimeMillis()); + compactMetricMeta = + new CompactMetricMeta(result.compactionType(), result.compactionTimeMillis()); updateCompactDeletionFile(result.deletionFile()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index dd7286d95a86..9adf10aaed73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -218,12 +218,9 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) { CompactTask task; if (unit.fileRewrite()) { - task = new FileRewriteCompactTask( - rewriter, - unit, - dropDelete, - metricsReporter, - compactType); + task = + new FileRewriteCompactTask( + rewriter, unit, dropDelete, metricsReporter, compactType); } else { task = new MergeTreeCompactTask( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 5602e1674f85..34d106e45090 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -40,13 +40,13 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.metrics.CompactMetric; import org.apache.paimon.operation.commit.CommitChanges; import org.apache.paimon.operation.commit.CommitChangesProvider; import org.apache.paimon.operation.commit.CommitCleaner; import org.apache.paimon.operation.commit.CommitKindProvider; import org.apache.paimon.operation.commit.CommitResult; import org.apache.paimon.operation.commit.CommitScanner; -import org.apache.paimon.metrics.CompactMetric; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck; import org.apache.paimon.operation.commit.ManifestEntryChanges; @@ -67,13 +67,13 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.CompactMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -781,7 +781,18 @@ private int tryCommit( CommitKindProvider commitKindProvider, ConflictCheck conflictCheck, @Nullable String statsFileName) { - return tryCommit(changesProvider, identifier, watermark, logOffsets, properties, commitKindProvider, conflictCheck, statsFileName, null, null, null); + return tryCommit( + changesProvider, + identifier, + watermark, + logOffsets, + properties, + commitKindProvider, + conflictCheck, + statsFileName, + null, + null, + null); } private int tryCommit( @@ -889,7 +900,22 @@ CommitResult tryCommitOnce( @Nullable Snapshot latestSnapshot, ConflictCheck conflictCheck, @Nullable String newStatsFileName) { - return tryCommitOnce(retryResult, deltaFiles, changelogFiles, indexFiles, identifier, watermark, logOffsets, properties, commitKind, latestSnapshot, conflictCheck, newStatsFileName, null, null, null); + return tryCommitOnce( + retryResult, + deltaFiles, + changelogFiles, + indexFiles, + identifier, + watermark, + logOffsets, + properties, + commitKind, + latestSnapshot, + conflictCheck, + newStatsFileName, + null, + null, + null); } @VisibleForTesting @@ -1117,7 +1143,14 @@ CommitResult tryCommitOnce( properties.isEmpty() ? null : properties, nextRowIdStart); if (options.compactMetricsEnabled()) { - compactMetric = buildCompactMetric(newSnapshotId, commitKind, buckets, compactTypes, compactDurationTime, identifier); + compactMetric = + buildCompactMetric( + newSnapshotId, + commitKind, + buckets, + compactTypes, + compactDurationTime, + identifier); } } catch (Throwable e) { // fails when preparing for commit, we should clean up @@ -1377,14 +1410,37 @@ private void commitRetryWait(int retryCount) { } } - private CompactMetric buildCompactMetric(long newSnapshotId, CommitKind commitKind, List buckets, List compactTypes, List compactDurationTime, long identifier) { - if (commitKind.equals(CommitKind.COMPACT) && compactDurationTime !=null && !compactDurationTime.isEmpty()) { + private CompactMetric buildCompactMetric( + long newSnapshotId, + CommitKind commitKind, + List buckets, + List compactTypes, + List compactDurationTime, + long identifier) { + if (commitKind.equals(CommitKind.COMPACT) + && compactDurationTime != null + && !compactDurationTime.isEmpty()) { long maxDuration = compactDurationTime.stream().max(Long::compareTo).get(); long minDuration = compactDurationTime.stream().min(Long::compareTo).get(); - long averageDuration = (long) compactDurationTime.stream().mapToLong(Long::longValue).average().getAsDouble(); + long averageDuration = + (long) + compactDurationTime.stream() + .mapToLong(Long::longValue) + .average() + .getAsDouble(); String compParts = buckets == null ? "{}" : String.join(",", buckets); - String compTypes = compactTypes == null ? "" : String.join(",", new HashSet<>(compactTypes)); - return new CompactMetric(newSnapshotId, System.currentTimeMillis(), averageDuration, maxDuration, minDuration, compParts, compTypes, identifier, commitUser); + String compTypes = + compactTypes == null ? "" : String.join(",", new HashSet<>(compactTypes)); + return new CompactMetric( + newSnapshotId, + System.currentTimeMillis(), + averageDuration, + maxDuration, + minDuration, + compParts, + compTypes, + identifier, + commitUser); } else { return null; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java index b160f24f980d..31a5e3fd6948 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java @@ -141,7 +141,8 @@ public void collect(CommitMessage message) { } buckets.add(builder + "bucket-" + commitMessage.bucket()); } - if (commitMessage.compactMetricIncrement() != null && commitMessage.compactMetricIncrement().metric() != null) { + if (commitMessage.compactMetricIncrement() != null + && commitMessage.compactMetricIncrement().metric() != null) { types.add(commitMessage.compactMetricIncrement().metric().type()); compactionDurationTime.add(commitMessage.compactMetricIncrement().metric().duration()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 0889671cbcb5..73108530dfbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -45,12 +45,12 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; -import org.apache.paimon.utils.CompactMetricsManager; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index eef8dfa7f6c5..067697121370 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -497,8 +497,7 @@ protected Runnable newExpireRunnable() { ExpireSnapshots expireSnapshots = newExpireSnapshots().config(expireConfig); ExpireCompactMetrics expireCompactMetrics = new ExpireCompactMetricsImpl( - store().compactMetricsManager(), - options.compactMetricsRetainedNum()); + store().compactMetricsManager(), options.compactMetricsRetainedNum()); snapshotExpire = () -> { expireSnapshots.expire(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java index 71ed1fc2d46b..5f4af8f20efd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireCompactMetricsImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.utils.CompactMetricsManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,5 +56,4 @@ public int expire() { return -1; } } - } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java index 16698d6eafaf..05a8e9c690db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java @@ -19,7 +19,11 @@ package org.apache.paimon.table.sink; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.io.*; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.DataInputViewStreamWrapper; +import org.apache.paimon.io.DataOutputViewStreamWrapper; import javax.annotation.Nullable; @@ -141,7 +145,13 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(partition, bucket, totalBuckets, dataIncrement, compactIncrement, compactMetricIncrement); + return Objects.hash( + partition, + bucket, + totalBuckets, + dataIncrement, + compactIncrement, + compactMetricIncrement); } @Override @@ -154,6 +164,11 @@ public String toString() { + "newFilesIncrement = %s, " + "compactIncrement = %s, " + "compactMetricIncrement = $s}", - partition, bucket, totalBuckets, dataIncrement, compactIncrement, compactMetricIncrement); + partition, + bucket, + totalBuckets, + dataIncrement, + compactIncrement, + compactMetricIncrement); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index d180e891ba45..71ea18ac8819 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -28,7 +28,20 @@ import org.apache.paimon.index.IndexFileMetaV1Deserializer; import org.apache.paimon.index.IndexFileMetaV2Deserializer; import org.apache.paimon.index.IndexFileMetaV3Deserializer; -import org.apache.paimon.io.*; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.CompactMetricIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFileMeta08Serializer; +import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10LegacySerializer; +import org.apache.paimon.io.DataFileMeta12LegacySerializer; +import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.utils.IOExceptionSupplier; import java.io.ByteArrayOutputStream; @@ -113,10 +126,10 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio indexEntrySerializer.serializeList(message.compactIncrement().deletedIndexFiles(), view); // compact metric increment - if (message.compactMetricIncrement() != null && message.compactMetricIncrement().metric() != null) { + if (message.compactMetricIncrement() != null + && message.compactMetricIncrement().metric() != null) { compactMetricSerializer.serialize(message.compactMetricIncrement().metric(), view); } - } @Override @@ -181,7 +194,12 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce } } return new CommitMessageImpl( - partition, bucket, totalBuckets, dataIncrement, compactIncrement, new CompactMetricIncrement(compactMetricDeserializer.get())); + partition, + bucket, + totalBuckets, + dataIncrement, + compactIncrement, + new CompactMetricIncrement(compactMetricDeserializer.get())); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java index 4e73e1385810..26bc124dbe57 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactionMetricsTable.java @@ -26,24 +26,51 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.metrics.CompactMetric; -import org.apache.paimon.predicate.*; +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.InPredicateVisitor; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.*; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; -import org.apache.paimon.utils.*; +import org.apache.paimon.utils.CompactMetricsManager; +import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.SerializationUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; @@ -59,19 +86,14 @@ public class CompactionMetricsTable implements ReadonlyTable { new DataField(0, "snapshot_id", new BigIntType(false)), new DataField(1, "commit_time", new TimestampType(false, 3)), new DataField(2, "compact_duration", new BigIntType(false)), - new DataField( - 3, "max_duration", new BigIntType(false)), + new DataField(3, "max_duration", new BigIntType(false)), new DataField(4, "min_duration", new BigIntType(false)), + new DataField(5, "partitions", SerializationUtils.newStringType(false)), new DataField( - 5, - "partitions", - SerializationUtils.newStringType(false)), - new DataField( - 6, - "compact_type", - SerializationUtils.newStringType(false)), + 6, "compact_type", SerializationUtils.newStringType(false)), new DataField(7, "identifier", new BigIntType(false)), - new DataField(8, "commit_user", SerializationUtils.newStringType(false)))); + new DataField( + 8, "commit_user", SerializationUtils.newStringType(false)))); private final FileIO fileIO; private final Path location; @@ -259,13 +281,15 @@ public RecordReader createReader(Split split) throws IOException { dataTable.fileIO(); CompactMetricsManager compactMetricsManager = dataTable.store().compactMetricsManager(); Iterator metrics = compactMetricsManager.metrics(); - Iterator rows = Iterators.transform(metrics, metric -> metric != null ? toRow(metric) : null); + Iterator rows = + Iterators.transform(metrics, metric -> metric != null ? toRow(metric) : null); if (readType != null) { rows = Iterators.transform( rows, row -> - ProjectedRow.from(readType, CompactionMetricsTable.TABLE_TYPE) + ProjectedRow.from( + readType, CompactionMetricsTable.TABLE_TYPE) .replaceRow(row)); } return new IteratorRecordReader<>(rows); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index 1236b1e4a78b..9036cdfc4b95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -41,8 +41,8 @@ import static org.apache.paimon.table.system.BranchesTable.BRANCHES; import static org.apache.paimon.table.system.BucketsTable.BUCKETS; import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; -import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; import static org.apache.paimon.table.system.CompactionMetricsTable.COMPACTS; +import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; import static org.apache.paimon.table.system.FilesTable.FILES; import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS; import static org.apache.paimon.table.system.OptionsTable.OPTIONS; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java index 046f0cab4065..187bf0976788 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java @@ -71,6 +71,12 @@ public CompactDeletionFile compactDeletionFile() { @Override public String toString() { - return dataIncrement.toString() + "\n" + compactIncrement + "\n" + compactDeletionFile + "\n" + compactMetricIncrement; + return dataIncrement.toString() + + "\n" + + compactIncrement + + "\n" + + compactDeletionFile + + "\n" + + compactMetricIncrement; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java index 3a78d9c1729d..0ef56a70a061 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactMetricsManager.java @@ -21,14 +21,21 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.metrics.CompactMetric; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Function; @@ -41,7 +48,10 @@ import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; -/** Manager for {@link CompactMetric}, providing utility methods related to paths and compact metrics hints. */ +/** + * Manager for {@link CompactMetric}, providing utility methods related to paths and compact metrics + * hints. + */ public class CompactMetricsManager implements Serializable { private static final long serialVersionUID = 1L; @@ -56,10 +66,7 @@ public class CompactMetricsManager implements Serializable { private final Path tablePath; private final String branch; - public CompactMetricsManager( - FileIO fileIO, - Path tablePath, - @Nullable String branchName) { + public CompactMetricsManager(FileIO fileIO, Path tablePath, @Nullable String branchName) { this.fileIO = fileIO; this.tablePath = tablePath; this.branch = BranchManager.normalizeBranch(branchName); @@ -79,7 +86,10 @@ public String branch() { public Path compactMetricPath(long snapshotId) { return new Path( - branchPath(tablePath, branch) + "/metrics/" + COMPACTION_METRICS_PREFIX + snapshotId); + branchPath(tablePath, branch) + + "/metrics/" + + COMPACTION_METRICS_PREFIX + + snapshotId); } public Path compactMetricsDirectory() { @@ -152,7 +162,8 @@ public Iterator metricsWithId(List snapshotIds) { public @Nullable Long earliestSnapshotId() { try { - return findEarliest(compactMetricsDirectory(), COMPACTION_METRICS_PREFIX, this::compactMetricPath); + return findEarliest( + compactMetricsDirectory(), COMPACTION_METRICS_PREFIX, this::compactMetricPath); } catch (IOException e) { throw new RuntimeException("Failed to find earliest compaction metrics snapshot id", e); } @@ -160,7 +171,8 @@ public Iterator metricsWithId(List snapshotIds) { public @Nullable Long latestSnapshotId() { try { - return findLatest(compactMetricsDirectory(), COMPACTION_METRICS_PREFIX, this::compactMetricPath); + return findLatest( + compactMetricsDirectory(), COMPACTION_METRICS_PREFIX, this::compactMetricPath); } catch (IOException e) { throw new RuntimeException("Failed to find latest compaction metrics snapshot id", e); } @@ -171,7 +183,8 @@ public Iterator metricsWithId(List snapshotIds) { * be deleted by other processes, so just skip this snapshot. */ public List safelyGetAllmetrics() throws IOException { - List paths = metricIdStream().map(this::compactMetricPath).collect(Collectors.toList()); + List paths = + metricIdStream().map(this::compactMetricPath).collect(Collectors.toList()); List metrics = Collections.synchronizedList(new ArrayList<>(paths.size())); collectMetrics(