Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,20 @@ public InlineElement getDescription() {
"Ratio of the deleted rows in a data file to be forced compacted for "
+ "append-only table.");

public static final ConfigOption<Boolean> COMPACTION_METRICS_ENABLED =
key("compaction.metrics.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"If enabled, compaction metric data will be recorded for each instance.");

public static final ConfigOption<Integer> COMPACTION_METRIC_RETAINED_NUM =
key("compaction.metrics.retained_num")
.intType()
.defaultValue(10)
.withDescription(
"Set the maximum number of times a record metric can be retained.");

public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
key("changelog-producer")
.enumType(ChangelogProducer.class)
Expand Down Expand Up @@ -2719,6 +2733,14 @@ public double compactionDeleteRatioThreshold() {
return options.get(COMPACTION_DELETE_RATIO_THRESHOLD);
}

public boolean compactMetricsEnabled() {
return options.get(COMPACTION_METRICS_ENABLED);
}

public int compactMetricsRetainedNum() {
return options.get(COMPACTION_METRIC_RETAINED_NUM);
}

public long dynamicBucketTargetRowNum() {
return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
}
Expand Down
233 changes: 233 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/metrics/CompactMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;

/** @since 1.4.0 */
@Public
@JsonIgnoreProperties(ignoreUnknown = true)
public class CompactMetric implements Serializable {

private static final long serialVersionUID = 1L;

public static final long FIRST_SNAPSHOT_ID = 1;

public static final int TABLE_STORE_02_VERSION = 1;
protected static final int CURRENT_VERSION = 3;

protected static final String FIELD_VERSION = "version";
protected static final String FIELD_SNAPSHOT_ID = "snapshotId";
protected static final String FIELD_COMMIT_TIME = "commitTime";
protected static final String FIELD_COMPACT_AVG_DURATION = "compactAvgDuration";
protected static final String FIELD_COMPACTION_MAX_DURATION = "compactMaxDuration";
protected static final String FIELD_COMPACTION_MIN_DURATION = "compactMinDuration";
protected static final String FIELD_BUCKETS = "compactBuckets";
protected static final String FIELD_COMPACTION_TYPE = "compactType";
protected static final String FIELD_IDENTIFIER = "identifier";
protected static final String FIELD_COMMIT_USER = "commitUser";

@JsonProperty(FIELD_VERSION)
@Nullable
protected final Integer version;

@JsonProperty(FIELD_SNAPSHOT_ID)
protected final long snapshotId;

@JsonProperty(FIELD_COMMIT_TIME)
protected final long commitTime;

@JsonProperty(FIELD_COMPACT_AVG_DURATION)
protected final long compactDuration;

// a manifest list recording all new changes occurred in this snapshot
// for faster expire and streaming reads
@JsonProperty(FIELD_COMPACTION_MAX_DURATION)
protected final long compactMaxDuration;

@JsonProperty(FIELD_COMPACTION_MIN_DURATION)
protected final long compactMinDuration;

// a manifest list recording all changelog produced in this snapshot
// null if no changelog is produced, or for paimon <= 0.2
@JsonProperty(FIELD_BUCKETS)
protected final String buckets;

@JsonProperty(FIELD_IDENTIFIER)
protected final long identifier;

@JsonProperty(FIELD_COMMIT_USER)
protected final String commitUser;

@JsonProperty(FIELD_COMPACTION_TYPE)
protected final String compactType;

public CompactMetric(
long snapshotId,
long commitTime,
long compactDuration,
long compactMaxDuration,
long compactMinDuration,
String buckets,
String compactType,
long identifier,
String commitUser) {
this(
CURRENT_VERSION,
snapshotId,
commitTime,
compactDuration,
compactMaxDuration,
compactMinDuration,
buckets,
compactType,
identifier,
commitUser);
}

@JsonCreator
public CompactMetric(
@JsonProperty(FIELD_VERSION) @Nullable Integer version,
@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
@JsonProperty(FIELD_COMMIT_TIME) long commitTime,
@JsonProperty(FIELD_COMPACT_AVG_DURATION) long compactDuration,
@JsonProperty(FIELD_COMPACTION_MAX_DURATION) long compactMaxDuration,
@JsonProperty(FIELD_COMPACTION_MIN_DURATION) long compactMinDuration,
@JsonProperty(FIELD_BUCKETS) @Nullable String buckets,
@JsonProperty(FIELD_COMPACTION_TYPE) @Nullable String compactType,
@JsonProperty(FIELD_IDENTIFIER) long identifier,
@JsonProperty(FIELD_COMMIT_USER) String commitUser) {
this.version = version;
this.snapshotId = snapshotId;
this.commitTime = commitTime;
this.compactDuration = compactDuration;
this.compactMaxDuration = compactMaxDuration;
this.compactMinDuration = compactMinDuration;
this.buckets = buckets;
this.compactType = compactType;
this.identifier = identifier;
this.commitUser = commitUser;
}

@JsonGetter(FIELD_VERSION)
public int version() {
// there is no version field for paimon <= 0.2
return version == null ? TABLE_STORE_02_VERSION : version;
}

@JsonGetter(FIELD_SNAPSHOT_ID)
public long snapshotId() {
return snapshotId;
}

@JsonGetter(FIELD_COMMIT_TIME)
public long commitTime() {
return commitTime;
}

@JsonGetter(FIELD_COMPACT_AVG_DURATION)
public long compactDuration() {
return compactDuration;
}

@JsonGetter(FIELD_COMPACTION_MAX_DURATION)
public long compactMaxDuration() {
return compactMaxDuration;
}

@JsonGetter(FIELD_COMPACTION_MIN_DURATION)
public long compactMinDuration() {
return compactMinDuration;
}

@JsonGetter(FIELD_BUCKETS)
public String buckets() {
return buckets == null ? "{}" : buckets;
}

@JsonGetter(FIELD_COMPACTION_TYPE)
public String compactType() {
return compactType;
}

@JsonGetter(FIELD_IDENTIFIER)
public long identifier() {
return identifier;
}

@JsonGetter(FIELD_COMMIT_USER)
public String commitUser() {
return commitUser;
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}

@Override
public int hashCode() {
return Objects.hash(
version,
snapshotId,
commitTime,
compactDuration,
compactMaxDuration,
compactMinDuration,
buckets,
compactType,
identifier,
commitUser);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactMetric that = (CompactMetric) o;
return Objects.equals(version, that.version)
&& snapshotId == that.snapshotId
&& commitTime == that.commitTime
&& Objects.equals(compactDuration, that.compactDuration)
&& Objects.equals(compactMaxDuration, that.compactMaxDuration)
&& Objects.equals(compactMinDuration, that.compactMinDuration)
&& Objects.equals(buckets, that.buckets)
&& Objects.equals(compactType, that.compactType)
&& Objects.equals(identifier, that.identifier)
&& Objects.equals(commitUser, that.commitUser);
}

public static CompactMetric fromJson(String json) {
return JsonSerdeUtil.fromJson(json, CompactMetric.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.CompactMetricsManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.InternalRowPartitionComputer;
Expand Down Expand Up @@ -292,6 +293,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
options.partitionDefaultName(),
pathFactory(),
snapshotManager,
compactMetricsManager(),
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
Expand Down Expand Up @@ -507,4 +509,9 @@ public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
snapshotManager(),
newIndexFileHandler());
}

@Override
public CompactMetricsManager compactMetricsManager() {
return new CompactMetricsManager(fileIO, options.path(), options.branch());
}
}
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.CompactMetricsManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SegmentsCache;
Expand Down Expand Up @@ -127,4 +128,6 @@ PartitionExpire newPartitionExpire(
void setSnapshotCache(Cache<Path, Snapshot> cache);

GlobalIndexScanBuilder newGlobalIndexScanBuilder();

CompactMetricsManager compactMetricsManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.paimon.append;

import org.apache.paimon.compact.CompactMetricMeta;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.CompactMetricIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.FileKind;
Expand Down Expand Up @@ -68,6 +70,7 @@ public List<DataFileMeta> compactAfter() {

public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite write)
throws Exception {
long startMillis = System.currentTimeMillis();
boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
Preconditions.checkArgument(
dvEnabled || compactBefore.size() > 1,
Expand Down Expand Up @@ -111,14 +114,18 @@ public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite wr
Collections.emptyList(),
newIndexFiles,
deletedIndexFiles);
CompactMetricIncrement compactMetricIncrement =
new CompactMetricIncrement(
new CompactMetricMeta("full", System.currentTimeMillis() - startMillis));
return new CommitMessageImpl(
partition,
// bucket 0 is bucket for unaware-bucket table
// for compatibility with the old design
0,
table.coreOptions().bucket(),
DataIncrement.emptyIncrement(),
compactIncrement);
compactIncrement,
compactMetricIncrement);
}

public int hashCode() {
Expand Down
Loading
Loading