diff --git a/persistence-modules/apache-paimon/pom.xml b/persistence-modules/apache-paimon/pom.xml
new file mode 100644
index 000000000000..1286d239096f
--- /dev/null
+++ b/persistence-modules/apache-paimon/pom.xml
@@ -0,0 +1,58 @@
+
+
+ 4.0.0
+
+ com.baeldung
+ persistence-modules
+ 1.0.0-SNAPSHOT
+
+ com.baeldung
+ apache-paimon
+ 1.0-SNAPSHOT
+ apache-paimon
+
+
+
+ org.apache.paimon
+ paimon-bundle
+ ${paimon.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-client-runtime
+ runtime
+ ${hadoop.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+
+
+
+
+ 1.4.1
+ 3.4.3
+ 17
+
+
\ No newline at end of file
diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java
new file mode 100644
index 000000000000..eab4d7bcc270
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java
@@ -0,0 +1,87 @@
+package com.baeldung.paimon;
+
+public class Metric {
+ private String deviceId;
+ private String metricsName;
+ private double metricsValue;
+ private String source;
+ private String state = "active";
+ private String createTime;
+ private String createdBy;
+
+ public Metric() {
+ }
+
+ public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime, String createdBy, String state) {
+ this.deviceId = deviceId;
+ this.metricsName = metricsName;
+ this.metricsValue = metricsValue;
+ this.source = source;
+ this.createTime = createTime;
+ this.createdBy = createdBy;
+ this.state = state;
+ }
+
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+ public void setCreatedBy(String createdBy) {
+ this.createdBy = createdBy;
+ }
+
+
+ public String getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceId(String deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public String getMetricsName() {
+ return metricsName;
+ }
+
+ public void setMetricsName(String metricsName) {
+ this.metricsName = metricsName;
+ }
+
+ public double getMetricsValue() {
+ return metricsValue;
+ }
+
+ public void setMetricsValue(double metricsValue) {
+ this.metricsValue = metricsValue;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public String getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(String createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ @Override
+ public String toString() {
+ return "Metric [deviceId=" + deviceId + ", metricsName=" + metricsName + ", metricsValue="
+ + metricsValue + ", source=" + source + ", createTime=" + createTime + ", state=" + state + "]";
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java
new file mode 100644
index 000000000000..5612fdde4c11
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java
@@ -0,0 +1,53 @@
+package com.baeldung.paimon;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricReader {
+
+ Logger logger = LoggerFactory.getLogger(MetricReader.class);
+
+ List readMetrics() {
+ ClassLoader classLoader = getClass().getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream("metrics.out");
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+ String line;
+ String CREATED_BY = "admin";
+ List metrics = new ArrayList<>();
+ boolean isFirstLine = true;
+ try {
+ while ((line = reader.readLine()) != null) {
+ if (isFirstLine) {
+ isFirstLine = false;
+ continue;
+ }
+ String[] parts = line.split(",");
+ if (parts.length == 5) {
+ String deviceId = parts[0];
+ String metricsName = parts[1];
+
+
+ String createTime = parts[2];
+
+ long metricsValue = (long) Double.parseDouble(parts[3]);
+ String source = parts[4];
+ logger.info("Read metric: deviceId={}, metricsName={}, createTime={}, metricsValue={}, source={}",
+ deviceId, metricsName, createTime, metricsValue, source);
+
+ Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime, CREATED_BY, "active");
+ metrics.add(metric);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error reading metrics", e);
+ }
+
+ return metrics;
+ }
+}
diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java
new file mode 100644
index 000000000000..a0ae92610712
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java
@@ -0,0 +1,36 @@
+package com.baeldung.paimon;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+
+public class PaimonDatabaseManager {
+ public static Catalog createCatalog(String warehousePath) {
+ CatalogContext context = CatalogContext.create(new Path(warehousePath));
+ return CatalogFactory.createCatalog(context);
+ }
+
+ public static Identifier createTable(Catalog catalog) throws Exception {
+
+ Schema schema = Schema.newBuilder()
+ .column("device_id", DataTypes.STRING())
+ .column("metrics_name", DataTypes.STRING())
+ .column("metrics_value", DataTypes.DOUBLE())
+ .column("source", DataTypes.STRING())
+ .column("create_time", DataTypes.TIMESTAMP(3))
+ .column("created_by", DataTypes.STRING())
+ .column("state", DataTypes.STRING())
+ .primaryKey("device_id", "metrics_name", "create_time")
+ .build();
+
+ Identifier tableId = Identifier.create("metric_db", "metrics");
+
+ catalog.createDatabase("metric_db", false);
+ catalog.createTable(tableId, schema, false);
+ return tableId;
+ }
+}
diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java
new file mode 100644
index 000000000000..b67c88de6ec3
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java
@@ -0,0 +1,196 @@
+package com.baeldung.paimon;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonTableDataManager {
+
+ final static Logger logger = LoggerFactory.getLogger(PaimonTableDataManager.class);
+
+ public static void insert(Catalog catalog, Identifier tableId, List metrics) throws Exception {
+
+ Table table = catalog.getTable(tableId);
+
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ BatchTableWrite write = builder.newWrite();
+
+ metrics.forEach(metric -> {
+ try {
+ GenericRow row = createGenericRow(metric);
+ write.write(row, 0);
+ } catch (Exception e) {
+ logger.error("Error writing metric", e);
+ }
+ });
+
+ List messages = write.prepareCommit();
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(messages);
+ }
+
+ public static void updateMetricStateByDeviceIdMetricNameAndCreatedDate(Catalog catalog,
+ Identifier tableId, String deviceId, String metricName, String newState, String createdDate)
+ throws Exception {
+
+ Metric metric = fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, deviceId, metricName, createdDate);
+ Table table = catalog.getTable(tableId);
+
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ BatchTableWrite write = builder.newWrite();
+ metric.setState(newState);
+ write.write(createGenericRow(metric), 0);
+
+ List messages = write.prepareCommit();
+
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(messages);
+ }
+
+ public static void deleteRecordsByDeviceIdMetricNameAndCreatedDate(Catalog catalog, Identifier tableId,
+ String deviceId, String metricsName, String createdDate) throws Exception {
+ Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, deviceId, metricsName, createdDate);
+ Table table = catalog.getTable(tableId);
+
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ BatchTableWrite write = builder.newWrite();
+
+ GenericRow deleteRow = createGenericRow(metric);
+
+ deleteRow.setRowKind(RowKind.DELETE);
+
+ write.write(deleteRow, 0);
+
+ List messages = write.prepareCommit();
+
+ builder.newCommit().commit(messages);
+ }
+
+ public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Identifier tableId,
+ String source, String startDate, String endDate) throws Exception {
+
+ Table table = catalog.getTable(tableId);
+ RowType rowType = table.rowType();
+ PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
+
+ int[] projection = new int[] {0, 1, 2, 3, 4, 6};
+
+ Predicate sourcePredicate = predicateBuilder.equal(3, BinaryString.fromString(source));
+ Predicate dateRangePredicate = predicateBuilder.between(4, convertToTimestamp(startDate), convertToTimestamp(endDate));
+ Predicate predicate = PredicateBuilder.and(sourcePredicate, dateRangePredicate);
+
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection);
+
+ List splits = readBuilder.newScan().plan().splits();
+
+ TableRead read = readBuilder.newRead().executeFilter();
+
+ RecordReader reader = read.createReader(splits);
+
+ List results = new ArrayList<>();
+
+ reader.forEachRemaining(internalRow -> {
+ String deviceId = internalRow.getString(0).toString();
+ String metricsName = internalRow.getString(1).toString();
+ double metricsValue = internalRow.getDouble(2);
+ String sourceValue = internalRow.getString(3).toString();
+ Timestamp timestamp = internalRow.getTimestamp(4, 3);
+ String createTime = ConvertTimestampToStr(timestamp);
+ String state = internalRow.getString(5).toString();
+
+ Metric metric = new Metric(deviceId, metricsName, metricsValue, sourceValue, createTime, null, state);
+ logger.info("Fetched Metric: {}", metric);
+ results.add(metric);
+ });
+
+ return results;
+ }
+
+ public static Metric fetchMetricByDeviceIdMetricNameAndCreatedDate(Catalog catalog, Identifier tableId,
+ String deviceId, String metricsName, String createdDate) throws Exception {
+ Table table = catalog.getTable(tableId);
+ RowType rowType = table.rowType();
+ PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
+
+ int[] projection = new int[] {0, 1, 2, 3, 4, 5, 6};
+
+ Predicate devicePredicate = predicateBuilder.equal(0, BinaryString.fromString(deviceId));
+ Predicate metricPredicate = predicateBuilder.equal(1, BinaryString.fromString(metricsName));
+ Predicate datePredicate = predicateBuilder.equal(4, convertToTimestamp(createdDate));
+ Predicate predicate = PredicateBuilder.and(devicePredicate, metricPredicate, datePredicate);
+
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection);
+
+ List splits = readBuilder.newScan().plan().splits();
+
+ TableRead read = readBuilder.newRead().executeFilter();
+
+ RecordReader reader = read.createReader(splits);
+ List results = new ArrayList<>();
+
+ reader.forEachRemaining( internalRow -> {
+ String deviceIdValue = internalRow.getString(0).toString();
+ String metricsNameValue = internalRow.getString(1).toString();
+ double metricsValue = internalRow.getDouble(2);
+ String sourceValue = internalRow.getString(3).toString();
+ Timestamp timestamp = internalRow.getTimestamp(4, 3);
+ String createTime = ConvertTimestampToStr(timestamp);
+ String createdBy = internalRow.getString(5).toString();
+ String state = internalRow.getString(6).toString();
+ Metric metric = new Metric(deviceIdValue, metricsNameValue, metricsValue, sourceValue, createTime, createdBy, state);
+ results.add(metric);
+ logger.info("Fetched Metric: {}", metric);
+ });
+
+ return results.isEmpty() ? null : results.get(0);
+ }
+
+ private static String ConvertTimestampToStr(Timestamp timestamp) {
+ return timestamp.toLocalDateTime()
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ }
+
+ private static Timestamp convertToTimestamp(String createTime) {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter);
+ return Timestamp.fromLocalDateTime(localDateTime);
+ }
+
+ private static GenericRow createGenericRow(Metric metric) {
+ GenericRow row = GenericRow.of(
+ BinaryString.fromString(metric.getDeviceId()),
+ BinaryString.fromString(metric.getMetricsName()),
+ metric.getMetricsValue(),
+ BinaryString.fromString(metric.getSource()),
+ convertToTimestamp(metric.getCreateTime()),
+ BinaryString.fromString(metric.getCreatedBy()),
+ BinaryString.fromString(metric.getState())
+ );
+ return row;
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml
new file mode 100644
index 000000000000..17938a26667f
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml
@@ -0,0 +1,24 @@
+@startuml metrics-datamodel
+!theme crt-amber
+
+entity "metrics (Paimon Table)" as metrics {
+ + device_id : STRING <>
+ + metric_name : STRING <>
+ + create_time : TIMESTAMP(3) <>
+ --
+ metric_value : DOUBLE
+ source : STRING
+ created_by : STRING
+ state : STRING
+}
+
+note right of metrics
+Primary Key: (device_id, metric_name, create_time)
+
+Optimized for:
+- High-ingest time-series data
+- Device-level analytics
+- Streaming upserts
+end note
+
+@enduml
\ No newline at end of file
diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics.out b/persistence-modules/apache-paimon/src/main/resources/metrics.out
new file mode 100644
index 000000000000..e9944f054339
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/resources/metrics.out
@@ -0,0 +1,91 @@
+device_id,metric_name,create_time,metric_value,source
+dev_101,cpu_usage,2026-04-21 18:57:01,72.5,agent
+dev_102,cpu_usage,2026-04-21 18:57:02,65.1,agent
+dev_103,memory_usage,2026-04-21 18:57:03,58.3,agent
+dev_104,disk_io,2026-04-21 18:57:04,120.7,collector
+dev_105,cpu_usage,2026-04-21 18:57:05,81.2,agent
+dev_101,memory_usage,2026-04-21 18:57:06,63.4,agent
+dev_102,disk_io,2026-04-21 18:57:07,95.6,collector
+dev_103,cpu_usage,2026-04-21 18:57:08,49.8,agent
+dev_104,memory_usage,2026-04-21 18:57:09,70.2,agent
+dev_105,disk_io,2026-04-21 18:57:10,110.1,collector
+dev_101,cpu_usage,2026-04-21 18:57:11,74.0,agent
+dev_102,memory_usage,2026-04-21 18:57:12,60.9,agent
+dev_103,disk_io,2026-04-21 18:57:13,88.2,collector
+dev_104,cpu_usage,2026-04-21 18:57:14,66.7,agent
+dev_105,memory_usage,2026-04-21 18:57:15,75.3,agent
+dev_101,disk_io,2026-04-21 18:57:16,130.5,collector
+dev_102,cpu_usage,2026-04-21 18:57:17,69.2,agent
+dev_103,memory_usage,2026-04-21 18:57:18,55.1,agent
+dev_104,disk_io,2026-04-21 18:57:19,102.4,collector
+dev_105,cpu_usage,2026-04-21 18:57:20,83.6,agent
+dev_106,cpu_usage,2026-04-21 18:57:21,61.3,agent
+dev_107,memory_usage,2026-04-21 18:57:22,67.8,agent
+dev_108,disk_io,2026-04-21 18:57:23,91.0,collector
+dev_109,cpu_usage,2026-04-21 18:57:24,77.5,agent
+dev_110,memory_usage,2026-04-21 18:57:25,62.6,agent
+dev_106,disk_io,2026-04-21 18:57:26,105.3,collector
+dev_107,cpu_usage,2026-04-21 18:57:27,71.4,agent
+dev_108,memory_usage,2026-04-21 18:57:28,59.7,agent
+dev_109,disk_io,2026-04-21 18:57:29,98.8,collector
+dev_110,cpu_usage,2026-04-21 18:57:30,64.9,agent
+dev_111,cpu_usage,2026-04-21 18:57:31,73.3,agent
+dev_112,memory_usage,2026-04-21 18:57:32,68.1,agent
+dev_113,disk_io,2026-04-21 18:57:33,115.9,collector
+dev_114,cpu_usage,2026-04-21 18:57:34,79.0,agent
+dev_115,memory_usage,2026-04-21 18:57:35,57.6,agent
+dev_111,disk_io,2026-04-21 18:57:36,122.0,collector
+dev_112,cpu_usage,2026-04-21 18:57:37,69.5,agent
+dev_113,memory_usage,2026-04-21 18:57:38,61.2,agent
+dev_114,disk_io,2026-04-21 18:57:39,99.1,collector
+dev_115,cpu_usage,2026-04-21 18:57:40,82.7,agent
+dev_116,cpu_usage,2026-04-21 18:57:41,66.0,agent
+dev_117,memory_usage,2026-04-21 18:57:42,72.4,agent
+dev_118,disk_io,2026-04-21 18:57:43,87.5,collector
+dev_119,cpu_usage,2026-04-21 18:57:44,75.8,agent
+dev_120,memory_usage,2026-04-21 18:57:45,63.0,agent
+dev_116,disk_io,2026-04-21 18:57:46,108.7,collector
+dev_117,cpu_usage,2026-04-21 18:57:47,70.6,agent
+dev_118,memory_usage,2026-04-21 18:57:48,58.9,agent
+dev_119,disk_io,2026-04-21 18:57:49,93.2,collector
+dev_120,cpu_usage,2026-04-21 18:57:50,67.4,agent
+dev_121,cpu_usage,2026-04-21 18:57:51,78.2,agent
+dev_122,memory_usage,2026-04-21 18:57:52,64.8,agent
+dev_123,disk_io,2026-04-21 18:57:53,101.6,collector
+dev_124,cpu_usage,2026-04-21 18:57:54,72.9,agent
+dev_125,memory_usage,2026-04-21 18:57:55,60.5,agent
+dev_121,disk_io,2026-04-21 18:57:56,118.3,collector
+dev_122,cpu_usage,2026-04-21 18:57:57,68.7,agent
+dev_123,memory_usage,2026-04-21 18:57:58,56.2,agent
+dev_124,disk_io,2026-04-21 18:57:59,97.0,collector
+dev_125,cpu_usage,2026-04-21 18:58:00,80.1,agent
+dev_126,cpu_usage,2026-04-21 18:58:01,69.9,agent
+dev_127,memory_usage,2026-04-21 18:58:02,66.3,agent
+dev_128,disk_io,2026-04-21 18:58:03,92.7,collector
+dev_129,cpu_usage,2026-04-21 18:58:04,76.4,agent
+dev_130,memory_usage,2026-04-21 18:58:05,61.7,agent
+dev_126,disk_io,2026-04-21 18:58:06,109.9,collector
+dev_127,cpu_usage,2026-04-21 18:58:07,73.1,agent
+dev_128,memory_usage,2026-04-21 18:58:08,59.4,agent
+dev_129,disk_io,2026-04-21 18:58:09,94.6,collector
+dev_130,cpu_usage,2026-04-21 18:58:10,68.2,agent
+dev_131,cpu_usage,2026-04-21 18:58:11,77.0,agent
+dev_132,memory_usage,2026-04-21 18:58:12,65.9,agent
+dev_133,disk_io,2026-04-21 18:58:13,103.8,collector
+dev_134,cpu_usage,2026-04-21 18:58:14,74.6,agent
+dev_135,memory_usage,2026-04-21 18:58:15,62.1,agent
+dev_131,disk_io,2026-04-21 18:58:16,117.5,collector
+dev_132,cpu_usage,2026-04-21 18:58:17,70.8,agent
+dev_133,memory_usage,2026-04-21 18:58:18,57.3,agent
+dev_134,disk_io,2026-04-21 18:58:19,98.2,collector
+dev_135,cpu_usage,2026-04-21 18:58:20,81.5,agent
+dev_136,cpu_usage,2026-04-21 18:58:21,67.6,agent
+dev_137,memory_usage,2026-04-21 18:58:22,63.8,agent
+dev_138,disk_io,2026-04-21 18:58:23,90.4,collector
+dev_139,cpu_usage,2026-04-21 18:58:24,75.2,agent
+dev_140,memory_usage,2026-04-21 18:58:25,60.0,agent
+dev_136,disk_io,2026-04-21 18:58:26,106.1,collector
+dev_137,cpu_usage,2026-04-21 18:58:27,72.0,agent
+dev_138,memory_usage,2026-04-21 18:58:28,58.7,agent
+dev_139,disk_io,2026-04-21 18:58:29,95.9,collector
+dev_140,cpu_usage,2026-04-21 18:58:30,69.3,agent
diff --git a/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml b/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml
new file mode 100644
index 000000000000..90ea39ca0454
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml
@@ -0,0 +1,49 @@
+@startuml
+
+ interface Catalog {
+ +createDatabase(String name, boolean ignoreIfExists)
+ +dropDatabase(String name, boolean ignoreIfExists, boolean cascade)
+ +databaseExists(String name): boolean
+ +listDatabases(): List
+ +createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
+ +getTable(Identifier identifier): Table
+ +dropTable(Identifier identifier, boolean ignoreIfExists)
+ +renameTable(Identifier from, Identifier to, boolean ignoreIfExists)
+ +alterTable(Identifier identifier, List changes, boolean ignoreIfExists)
+ }
+
+ interface Table {
+ +name(): String
+ +rowType(): RowType
+ +partitionKeys(): List
+ +primaryKeys(): List
+ +options(): Map
+ +comment(): Optional
+ +copy(Map dynamicOptions): Table
+ }
+
+ class Schema {
+ -fields: List
+ -partitionKeys: List
+ -primaryKeys: List
+ -options: Map
+ }
+
+ class DataField {
+ +id(): int
+ +name(): String
+ +type(): DataType
+ +description(): String
+ }
+
+ interface InternalRow {
+ +getField(int pos): Object
+ }
+
+
+ Catalog ..> Table : creates/retrieves
+ Table --> Schema : defined by
+ Schema "1" *-- "many" DataField : contains
+ Table ..> InternalRow : represents records as
+
+@enduml
\ No newline at end of file
diff --git a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java
new file mode 100644
index 000000000000..d8908c1e81ed
--- /dev/null
+++ b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java
@@ -0,0 +1,135 @@
+package com.baeldung.paimon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class PaimonLifecycleIntegrationTest {
+ private static final Logger logger =
+ LoggerFactory.getLogger(PaimonLifecycleIntegrationTest.class);
+
+ private Catalog catalog;
+ private Identifier tableId;
+
+ private String WAREHOUSE_PATH;
+
+ @BeforeAll
+ void setup(@TempDir Path tempDir) {
+ WAREHOUSE_PATH = getWarehousePath(tempDir);
+ logger.info("Warehouse path set to: {}", WAREHOUSE_PATH);
+ }
+
+ @AfterAll
+ void cleanup() {
+ if (catalog != null) {
+ try {
+ if (catalog.listTables("metric_db").contains("metrics")) {
+ catalog.dropTable(tableId, true);
+ }
+ if (catalog.listDatabases().contains("metric_db")) {
+ catalog.dropDatabase("metric_db", true, true);
+ }
+ logger.info("Cleanup completed successfully");
+ } catch (Exception e) {
+ logger.error("Error during cleanup", e);
+ }
+ }
+ }
+
+ @Test
+ @Order(1)
+ void whenCallCreateTable_thenTableCreated() throws Exception {
+ logger.info("Warehouse path set to: {}", WAREHOUSE_PATH);
+ catalog = PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH);
+ assertNotNull(catalog);
+
+ tableId = PaimonDatabaseManager.createTable(catalog);
+ assertTrue(catalog.listTables("metric_db").contains("metrics"));
+ assertNotNull(tableId);
+ }
+
+ @Test
+ @Order(2)
+ void whenCallInsertRecords_thenRecordsInserted() throws Exception {
+ assertTrue(catalog.listTables("metric_db").contains("metrics"));
+ PaimonTableDataManager.insert(catalog, tableId, getMetrics());
+ }
+
+ @Test
+ @Order(3)
+ void whenCallReadRecords_thenRecordsRead() throws Exception {
+ List metrics = PaimonTableDataManager.fetchMetricsBySourceAndDateRange(catalog,
+ tableId, "collector", "2026-04-21 18:58:19", "2026-04-21 18:58:30");
+ assertFalse(metrics.isEmpty());
+ assertTrue(metrics.size() == 4);
+ }
+
+ @Test
+ @Order(4)
+ void whenCallDeleteRecord_thenRecordDeleted() throws Exception {
+ Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26");
+ assertNotNull(metric);
+
+ PaimonTableDataManager.deleteRecordsByDeviceIdMetricNameAndCreatedDate(
+ catalog, tableId, "dev_136",
+ "disk_io", "2026-04-21 18:58:26"
+ );
+
+ metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(
+ catalog, tableId, "dev_136",
+ "disk_io", "2026-04-21 18:58:26"
+ );
+ assertTrue(metric == null);
+ }
+
+ @Test
+ @Order(5)
+ void whenCallUpdateRecord_thenRecordUpdated() throws Exception {
+ Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(
+ catalog, tableId,
+ "dev_137", "cpu_usage",
+ "2026-04-21 18:58:27"
+ );
+ assertNotNull(metric);
+
+ assertEquals("active", metric.getState());
+
+ PaimonTableDataManager.updateMetricStateByDeviceIdMetricNameAndCreatedDate(
+ catalog, tableId, "dev_137",
+ "cpu_usage", "inactive",
+ "2026-04-21 18:58:27"
+ );
+
+ metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(
+ catalog, tableId, "dev_137",
+ "cpu_usage", "2026-04-21 18:58:27"
+ );
+ assertNotNull(metric);
+ assertEquals("inactive", metric.getState());
+ }
+
+ private List getMetrics() {
+ return new MetricReader().readMetrics();
+ }
+
+ private String getWarehousePath(Path tempDir) {
+ return "file:" + tempDir.toString();
+ }
+}
diff --git a/persistence-modules/pom.xml b/persistence-modules/pom.xml
index b720cebef5a0..78bc22a4374d 100644
--- a/persistence-modules/pom.xml
+++ b/persistence-modules/pom.xml
@@ -18,6 +18,7 @@
apache-bookkeeper
apache-cayenne
apache-derby
+ apache-paimon
atomikos
blaze-persistence
clickhouse