diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBNativeMetricMonitor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBNativeMetricMonitor.java index e51ec32c7daec..83a885a4f477f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBNativeMetricMonitor.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBNativeMetricMonitor.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.View; +import org.apache.flink.util.IOUtils; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; @@ -107,7 +108,7 @@ void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { /** Updates the value of metricView if the reference is still valid. */ private void setProperty(RocksDBNativePropertyMetricView metricView) { - if (metricView.isClosed()) { + if (metricView.isClosed() || rocksDB == null) { return; } try { @@ -126,11 +127,11 @@ private void setProperty(RocksDBNativePropertyMetricView metricView) { } private void setStatistics(RocksDBNativeStatisticsMetricView metricView) { - if (metricView.isClosed()) { + if (metricView.isClosed() || statistics == null) { return; } - if (statistics != null) { - synchronized (lock) { + synchronized (lock) { + if (statistics != null) { metricView.setValue(statistics.getTickerCount(metricView.tickerType)); } } @@ -140,6 +141,8 @@ private void setStatistics(RocksDBNativeStatisticsMetricView metricView) { public void close() { synchronized (lock) { rocksDB = null; + // Wrapper holds a JNI shared_ptr that leaks without explicit close. See FLINK-39923. + IOUtils.closeQuietly(statistics); statistics = null; } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBHandle.java index c233922d9d2b6..5cbd60b3a154c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBHandle.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBHandle.java @@ -37,6 +37,7 @@ import org.rocksdb.DBOptions; import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.RocksDB; +import org.rocksdb.Statistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +91,8 @@ class RocksDBHandle implements AutoCloseable { private RocksDB db; private ColumnFamilyHandle defaultColumnFamilyHandle; private RocksDBNativeMetricMonitor nativeMetricMonitor; + // Released in close() for the partial-init case; on success the monitor closes it. + private Statistics statistics; private final Long writeBufferManagerCapacity; protected RocksDBHandle( @@ -147,12 +150,16 @@ private void loadDb() throws IOException { dbOptions); // remove the default column family which is located at the first index defaultColumnFamilyHandle = columnFamilyHandles.remove(0); - // init native metrics monitor if configured + + if (!nativeMetricOptions.isEnabled()) { + return; + } + // dbOptions.statistics() returns a new Java wrapper around a fresh shared_ptr + // aliasing the existing native StatisticsImpl. The original wrapper is closed by + // RocksDBResourceContainer (via dbOptions); this one must also be closed. See FLINK-39923. + statistics = dbOptions.statistics(); nativeMetricMonitor = - nativeMetricOptions.isEnabled() - ? new RocksDBNativeMetricMonitor( - nativeMetricOptions, metricGroup, db, dbOptions.statistics()) - : null; + new RocksDBNativeMetricMonitor(nativeMetricOptions, metricGroup, db, statistics); } RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle( @@ -306,7 +313,13 @@ public DBOptions getDbOptions() { @Override public void close() throws Exception { IOUtils.closeQuietly(defaultColumnFamilyHandle); - IOUtils.closeQuietly(nativeMetricMonitor); + if (nativeMetricMonitor != null) { + // Monitor owns the statistics wrapper. + IOUtils.closeQuietly(nativeMetricMonitor); + } else { + // Monitor construction never completed; release the wrapper directly. + IOUtils.closeQuietly(statistics); + } IOUtils.closeQuietly(db); // Making sure the already created column family options will be closed columnFamilyDescriptors.forEach((cfd) -> IOUtils.closeQuietly(cfd.getOptions()));