From a0653c4d422e8eebc47f6e2c1e81bf3c08ab05c7 Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Sat, 9 May 2026 10:52:07 +0530 Subject: [PATCH 1/7] apache paimon BAEL-9567 --- persistence-modules/apache-paimon/pom.xml | 58 +++++++ .../main/java/com/baeldung/paimon/Metric.java | 60 +++++++ .../com/baeldung/paimon/MetricReader.java | 54 ++++++ .../paimon/PaimonDatabaseManager.java | 36 ++++ .../paimon/PaimonTableDataManager.java | 86 +++++++++ .../src/main/resources/metrics-datamodel.puml | 22 +++ .../src/main/resources/metrics.out | 91 ++++++++++ .../main/resources/paimon-table-creation.puml | 49 ++++++ .../PaimonLifecycleIntegrationTest.java | 163 ++++++++++++++++++ 9 files changed, 619 insertions(+) create mode 100644 persistence-modules/apache-paimon/pom.xml create mode 100644 persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java create mode 100644 persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java create mode 100644 persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java create mode 100644 persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java create mode 100644 persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml create mode 100644 persistence-modules/apache-paimon/src/main/resources/metrics.out create mode 100644 persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml create mode 100644 persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java diff --git a/persistence-modules/apache-paimon/pom.xml b/persistence-modules/apache-paimon/pom.xml new file mode 100644 index 000000000000..0af372660f42 --- /dev/null +++ b/persistence-modules/apache-paimon/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + com.baeldung + persistence-modules + 1.0.0-SNAPSHOT + + com.baeldung + apache-paimon + 1.0-SNAPSHOT + apache-paimon + + + + org.apache.paimon + paimon-bundle + ${paimon.version} + + + org.slf4j + slf4j-api + + + + + + + org.apache.hadoop + hadoop-client-runtime + runtime + ${hadoop.version} + + + org.slf4j + slf4j-api + + + org.xerial.snappy + snappy-java + + + com.google.code.findbugs + jsr305 + + + + + + + 1.3.1 + 3.4.3 + 17 + + \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java new file mode 100644 index 000000000000..7ca89d0d8401 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java @@ -0,0 +1,60 @@ +package com.baeldung.paimon; + +public class Metric { + private String deviceId; + private String metricsName; + private double metricsValue; + private String source; + private String createTime; + + public Metric() { + } + + public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime) { + this.deviceId = deviceId; + this.metricsName = metricsName; + this.metricsValue = metricsValue; + this.source = source; + this.createTime = createTime; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public String getMetricsName() { + return metricsName; + } + + public void setMetricsName(String metricsName) { + this.metricsName = metricsName; + } + + public double getMetricsValue() { + return metricsValue; + } + + public void setMetricsValue(double metricsValue) { + this.metricsValue = metricsValue; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public String getCreateTime() { + return createTime; + } + + public void setCreateTime(String createTime) { + this.createTime = createTime; + } +} \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java new file mode 100644 index 000000000000..ed90a3278510 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java @@ -0,0 +1,54 @@ +package com.baeldung.paimon; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricReader { + + Logger logger = LoggerFactory.getLogger(MetricReader.class); + + List readMetrics() { + //read the metrics from the file + ClassLoader classLoader = getClass().getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("metrics.out"); + + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + List metrics = new ArrayList<>(); + boolean isFirstLine = true; + try { + while ((line = reader.readLine()) != null) { + if (isFirstLine) { + isFirstLine = false; + continue; + } + String[] parts = line.split(","); + if (parts.length == 5) { + String deviceId = parts[0]; + String metricsName = parts[1]; + + + String createTime = parts[2]; + + long metricsValue = (long) Double.parseDouble(parts[3]); + String source = parts[4]; + logger.info("Read metric: deviceId={}, metricsName={}, createTime={}, metricsValue={}, source={}", + deviceId, metricsName, createTime, metricsValue, source); + + Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime); + //add the metric to the list + metrics.add(metric); + } + } + } catch (Exception e) { + logger.error("Error reading metrics", e); + } + + return metrics; + } +} diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java new file mode 100644 index 000000000000..91684f1febf9 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java @@ -0,0 +1,36 @@ +package com.baeldung.paimon; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataTypes; + +public class PaimonDatabaseManager { + public static Catalog createCatalog(String warehousePath) { + CatalogContext context = CatalogContext.create(new Path(warehousePath)); + return CatalogFactory.createCatalog(context); + } + + public static Identifier createTable(Catalog catalog) throws Exception { + + Schema schema = Schema.newBuilder() + .column("device_id", DataTypes.STRING()) + .column("metrics_name", DataTypes.STRING()) + .column("metrics_value", DataTypes.DOUBLE()) + .column("source", DataTypes.STRING()) + .column("create_time", DataTypes.TIMESTAMP(3)) + .primaryKey("device_id", "metrics_name", "create_time") + .build(); + + Identifier tableId = Identifier.create("metric_db", "metrics"); + catalog.createDatabase("metric_db", false); + + + + catalog.createTable(tableId, schema, false); + return tableId; + } +} diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java new file mode 100644 index 000000000000..4cb6fd853453 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java @@ -0,0 +1,86 @@ +package com.baeldung.paimon; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaimonTableDataManager { + + final static Logger logger = LoggerFactory.getLogger(PaimonTableDataManager.class); + + public static void insert(Catalog catalog, Identifier tableId, List metrics) throws Exception { + + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + + metrics.forEach(metric -> { + try { + + Timestamp timestamp = convertToTimestamp(metric.getCreateTime()); + //device_id,metric_name,create_time,metric_value,source + write.write(GenericRow.of( + BinaryString.fromString(metric.getDeviceId()), + BinaryString.fromString(metric.getMetricsName()), + metric.getMetricsValue(), + BinaryString.fromString(metric.getSource()), + timestamp + ), 0); + } catch (Exception e) { + logger.error("Error writing metric", e); + } + }); + + List messages = write.prepareCommit(); + + BatchTableCommit commit = builder.newCommit(); + commit.commit(messages); + } + + public static List read(Catalog catalog, Identifier tableId) throws Exception { + + Table table = catalog.getTable(tableId); + + ReadBuilder readBuilder = table.newReadBuilder(); + + List splits = readBuilder.newScan().plan().splits(); + + TableRead read = readBuilder.newRead(); + + RecordReader reader = read.createReader(splits); + + List results = new ArrayList<>(); + + reader.forEachRemaining(row -> { + results.add(row.toString()); + }); + + return results; + } + + private static Timestamp convertToTimestamp(String createTime) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter); + return Timestamp.fromLocalDateTime(localDateTime); + } +} \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml new file mode 100644 index 000000000000..182dcca72e9a --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml @@ -0,0 +1,22 @@ +@startuml +!theme crt-amber + +entity "metrics (Paimon Table)" as metrics { + + device_id : STRING <> + + metric_name : STRING <> + + create_time : TIMESTAMP(3) <> + -- + metric_value : DOUBLE + source : STRING +} + +note right of metrics +Primary Key: (device_id, metric_name, create_time) + +Optimized for: +- High-ingest time-series data +- Device-level analytics +- Streaming upserts +end note + +@enduml \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics.out b/persistence-modules/apache-paimon/src/main/resources/metrics.out new file mode 100644 index 000000000000..e9944f054339 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/resources/metrics.out @@ -0,0 +1,91 @@ +device_id,metric_name,create_time,metric_value,source +dev_101,cpu_usage,2026-04-21 18:57:01,72.5,agent +dev_102,cpu_usage,2026-04-21 18:57:02,65.1,agent +dev_103,memory_usage,2026-04-21 18:57:03,58.3,agent +dev_104,disk_io,2026-04-21 18:57:04,120.7,collector +dev_105,cpu_usage,2026-04-21 18:57:05,81.2,agent +dev_101,memory_usage,2026-04-21 18:57:06,63.4,agent +dev_102,disk_io,2026-04-21 18:57:07,95.6,collector +dev_103,cpu_usage,2026-04-21 18:57:08,49.8,agent +dev_104,memory_usage,2026-04-21 18:57:09,70.2,agent +dev_105,disk_io,2026-04-21 18:57:10,110.1,collector +dev_101,cpu_usage,2026-04-21 18:57:11,74.0,agent +dev_102,memory_usage,2026-04-21 18:57:12,60.9,agent +dev_103,disk_io,2026-04-21 18:57:13,88.2,collector +dev_104,cpu_usage,2026-04-21 18:57:14,66.7,agent +dev_105,memory_usage,2026-04-21 18:57:15,75.3,agent +dev_101,disk_io,2026-04-21 18:57:16,130.5,collector +dev_102,cpu_usage,2026-04-21 18:57:17,69.2,agent +dev_103,memory_usage,2026-04-21 18:57:18,55.1,agent +dev_104,disk_io,2026-04-21 18:57:19,102.4,collector +dev_105,cpu_usage,2026-04-21 18:57:20,83.6,agent +dev_106,cpu_usage,2026-04-21 18:57:21,61.3,agent +dev_107,memory_usage,2026-04-21 18:57:22,67.8,agent +dev_108,disk_io,2026-04-21 18:57:23,91.0,collector +dev_109,cpu_usage,2026-04-21 18:57:24,77.5,agent +dev_110,memory_usage,2026-04-21 18:57:25,62.6,agent +dev_106,disk_io,2026-04-21 18:57:26,105.3,collector +dev_107,cpu_usage,2026-04-21 18:57:27,71.4,agent +dev_108,memory_usage,2026-04-21 18:57:28,59.7,agent +dev_109,disk_io,2026-04-21 18:57:29,98.8,collector +dev_110,cpu_usage,2026-04-21 18:57:30,64.9,agent +dev_111,cpu_usage,2026-04-21 18:57:31,73.3,agent +dev_112,memory_usage,2026-04-21 18:57:32,68.1,agent +dev_113,disk_io,2026-04-21 18:57:33,115.9,collector +dev_114,cpu_usage,2026-04-21 18:57:34,79.0,agent +dev_115,memory_usage,2026-04-21 18:57:35,57.6,agent +dev_111,disk_io,2026-04-21 18:57:36,122.0,collector +dev_112,cpu_usage,2026-04-21 18:57:37,69.5,agent +dev_113,memory_usage,2026-04-21 18:57:38,61.2,agent +dev_114,disk_io,2026-04-21 18:57:39,99.1,collector +dev_115,cpu_usage,2026-04-21 18:57:40,82.7,agent +dev_116,cpu_usage,2026-04-21 18:57:41,66.0,agent +dev_117,memory_usage,2026-04-21 18:57:42,72.4,agent +dev_118,disk_io,2026-04-21 18:57:43,87.5,collector +dev_119,cpu_usage,2026-04-21 18:57:44,75.8,agent +dev_120,memory_usage,2026-04-21 18:57:45,63.0,agent +dev_116,disk_io,2026-04-21 18:57:46,108.7,collector +dev_117,cpu_usage,2026-04-21 18:57:47,70.6,agent +dev_118,memory_usage,2026-04-21 18:57:48,58.9,agent +dev_119,disk_io,2026-04-21 18:57:49,93.2,collector +dev_120,cpu_usage,2026-04-21 18:57:50,67.4,agent +dev_121,cpu_usage,2026-04-21 18:57:51,78.2,agent +dev_122,memory_usage,2026-04-21 18:57:52,64.8,agent +dev_123,disk_io,2026-04-21 18:57:53,101.6,collector +dev_124,cpu_usage,2026-04-21 18:57:54,72.9,agent +dev_125,memory_usage,2026-04-21 18:57:55,60.5,agent +dev_121,disk_io,2026-04-21 18:57:56,118.3,collector +dev_122,cpu_usage,2026-04-21 18:57:57,68.7,agent +dev_123,memory_usage,2026-04-21 18:57:58,56.2,agent +dev_124,disk_io,2026-04-21 18:57:59,97.0,collector +dev_125,cpu_usage,2026-04-21 18:58:00,80.1,agent +dev_126,cpu_usage,2026-04-21 18:58:01,69.9,agent +dev_127,memory_usage,2026-04-21 18:58:02,66.3,agent +dev_128,disk_io,2026-04-21 18:58:03,92.7,collector +dev_129,cpu_usage,2026-04-21 18:58:04,76.4,agent +dev_130,memory_usage,2026-04-21 18:58:05,61.7,agent +dev_126,disk_io,2026-04-21 18:58:06,109.9,collector +dev_127,cpu_usage,2026-04-21 18:58:07,73.1,agent +dev_128,memory_usage,2026-04-21 18:58:08,59.4,agent +dev_129,disk_io,2026-04-21 18:58:09,94.6,collector +dev_130,cpu_usage,2026-04-21 18:58:10,68.2,agent +dev_131,cpu_usage,2026-04-21 18:58:11,77.0,agent +dev_132,memory_usage,2026-04-21 18:58:12,65.9,agent +dev_133,disk_io,2026-04-21 18:58:13,103.8,collector +dev_134,cpu_usage,2026-04-21 18:58:14,74.6,agent +dev_135,memory_usage,2026-04-21 18:58:15,62.1,agent +dev_131,disk_io,2026-04-21 18:58:16,117.5,collector +dev_132,cpu_usage,2026-04-21 18:58:17,70.8,agent +dev_133,memory_usage,2026-04-21 18:58:18,57.3,agent +dev_134,disk_io,2026-04-21 18:58:19,98.2,collector +dev_135,cpu_usage,2026-04-21 18:58:20,81.5,agent +dev_136,cpu_usage,2026-04-21 18:58:21,67.6,agent +dev_137,memory_usage,2026-04-21 18:58:22,63.8,agent +dev_138,disk_io,2026-04-21 18:58:23,90.4,collector +dev_139,cpu_usage,2026-04-21 18:58:24,75.2,agent +dev_140,memory_usage,2026-04-21 18:58:25,60.0,agent +dev_136,disk_io,2026-04-21 18:58:26,106.1,collector +dev_137,cpu_usage,2026-04-21 18:58:27,72.0,agent +dev_138,memory_usage,2026-04-21 18:58:28,58.7,agent +dev_139,disk_io,2026-04-21 18:58:29,95.9,collector +dev_140,cpu_usage,2026-04-21 18:58:30,69.3,agent diff --git a/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml b/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml new file mode 100644 index 000000000000..90ea39ca0454 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml @@ -0,0 +1,49 @@ +@startuml + + interface Catalog { + +createDatabase(String name, boolean ignoreIfExists) + +dropDatabase(String name, boolean ignoreIfExists, boolean cascade) + +databaseExists(String name): boolean + +listDatabases(): List + +createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + +getTable(Identifier identifier): Table + +dropTable(Identifier identifier, boolean ignoreIfExists) + +renameTable(Identifier from, Identifier to, boolean ignoreIfExists) + +alterTable(Identifier identifier, List changes, boolean ignoreIfExists) + } + + interface Table { + +name(): String + +rowType(): RowType + +partitionKeys(): List + +primaryKeys(): List + +options(): Map + +comment(): Optional + +copy(Map dynamicOptions): Table + } + + class Schema { + -fields: List + -partitionKeys: List + -primaryKeys: List + -options: Map + } + + class DataField { + +id(): int + +name(): String + +type(): DataType + +description(): String + } + + interface InternalRow { + +getField(int pos): Object + } + + + Catalog ..> Table : creates/retrieves + Table --> Schema : defined by + Schema "1" *-- "many" DataField : contains + Table ..> InternalRow : represents records as + +@enduml \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java new file mode 100644 index 000000000000..12b0d021edac --- /dev/null +++ b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java @@ -0,0 +1,163 @@ +package com.baeldung.paimon; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.nio.file.Path; +import java.util.List; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.CloseableIterator; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class PaimonLifecycleIntegrationTest { + private static final Logger logger = LoggerFactory.getLogger(PaimonLifecycleIntegrationTest.class); + + private Catalog catalog; + private Identifier tableId; + + private String WAREHOUSE_PATH; + + @BeforeAll + void setup(@TempDir Path tempDir) { + WAREHOUSE_PATH = getWarehousePath(tempDir); + logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); + } + + @Test + @Order(1) + void whenCallCreateTable_thenTableCreated() throws Exception { + logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); + catalog = PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH); + assertNotNull(catalog); + + tableId = PaimonDatabaseManager.createTable(catalog); + assertTrue(catalog.listTables("metric_db").contains("metrics")); + assertNotNull(tableId); + } + + @Test + @Order(2) + void whenCallInsertRecords_thenRecordsInserted() throws Exception { + assertTrue(catalog.listTables("metric_db").contains("metrics")); + PaimonTableDataManager.insert(catalog, tableId, getMetrics()); + } + + + /* @Test + @Order(4) */ + void whenCallUpdateRecord_thenRecordUpdated() throws Exception { + + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + + // update Alice age + write.write(GenericRow.of(BinaryString.fromString("Alice"), 35)); + + List messages = write.prepareCommit(); + + builder.newCommit().commit(messages); + + assertTrue(messages.size() > 0); + } + + // ------------------------------------------------ + // 5 Delete Record + // ------------------------------------------------ +/* @Test + @Order(5) */ + void whenCallDeleteRecord_thenRecordDeleted() throws Exception { + + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + + GenericRow deleteRow = GenericRow.of(BinaryString.fromString("Bob"), 0); + + deleteRow.setRowKind(RowKind.DELETE); + + write.write(deleteRow); + + List messages = write.prepareCommit(); + + builder.newCommit().commit(messages); + + assertTrue(messages.size() > 0); + } + + // ------------------------------------------------ + // 6 Read Records + // ------------------------------------------------ + @Test + @Order(3) + void whenCallReadRecords_thenRecordsRead() throws Exception { + + Table table = catalog.getTable(tableId); + + ReadBuilder readBuilder = table.newReadBuilder(); + + List splits = readBuilder.newScan().plan().splits(); + + TableRead read = readBuilder.newRead(); + + RecordReader reader = read.createReader(splits); + + //List rows = new ArrayList<>(); + CloseableIterator iterator = reader.toCloseableIterator(); + while(iterator.hasNext()) { + InternalRow row = iterator.next(); + BinaryString deviceId = row.getString(0); + logger.info("Read record with deviceId: {}", deviceId); + //rows.add(row.toString()); + } + } + + // ------------------------------------------------ + // 7 Cleanup + // ------------------------------------------------ + @Test + @Order(4) + void whenCallCleanup_thenResourcesCleanedUp() throws Exception { + + assertDoesNotThrow(() -> catalog.dropTable(tableId, true), "Table does not exist"); + + assertDoesNotThrow(() -> catalog.dropDatabase("metric_db", true, true), "Database does not exist"); + + logger.info("Cleanup completed successfully"); + } + + private List getMetrics() { + return new MetricReader().readMetrics(); + } + + private String getWarehousePath(Path tempDir) { + return "file:" + tempDir.toString(); + } +} \ No newline at end of file From 0de93593c6c36b63d236d54518ad686953c3f5e9 Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Sat, 9 May 2026 11:09:05 +0530 Subject: [PATCH 2/7] included apache-paimon module --- persistence-modules/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/persistence-modules/pom.xml b/persistence-modules/pom.xml index b720cebef5a0..78bc22a4374d 100644 --- a/persistence-modules/pom.xml +++ b/persistence-modules/pom.xml @@ -18,6 +18,7 @@ apache-bookkeeper apache-cayenne apache-derby + apache-paimon atomikos blaze-persistence clickhouse From 9c45215fae23b20c8022124c8ba32f616151f34d Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Fri, 15 May 2026 21:57:35 +0530 Subject: [PATCH 3/7] Query Paimon --- .../main/java/com/baeldung/paimon/Metric.java | 19 +++- .../com/baeldung/paimon/MetricColumns.java | 19 ++++ .../com/baeldung/paimon/MetricReader.java | 3 +- .../paimon/PaimonDatabaseManager.java | 1 + .../paimon/PaimonTableDataManager.java | 52 +++++++--- .../PaimonLifecycleIntegrationTest.java | 94 +++++++++---------- 6 files changed, 123 insertions(+), 65 deletions(-) create mode 100644 persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java index 7ca89d0d8401..484b9455ce10 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java @@ -6,18 +6,29 @@ public class Metric { private double metricsValue; private String source; private String createTime; + private String createdBy; public Metric() { } - public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime) { + public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime, String createdBy) { this.deviceId = deviceId; this.metricsName = metricsName; this.metricsValue = metricsValue; this.source = source; this.createTime = createTime; + this.createdBy = createdBy; } + public String getCreatedBy() { + return createdBy; + } + + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + + public String getDeviceId() { return deviceId; } @@ -57,4 +68,10 @@ public String getCreateTime() { public void setCreateTime(String createTime) { this.createTime = createTime; } + + @Override + public String toString() { + return "Metric [deviceId=" + deviceId + ", metricsName=" + metricsName + ", metricsValue=" + + metricsValue + ", source=" + source + ", createTime=" + createTime + "]"; + } } \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java new file mode 100644 index 000000000000..954aad16713c --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java @@ -0,0 +1,19 @@ +package com.baeldung.paimon; + +enum MetricColumns { + DEVICE_ID(0), + METRICS_NAME(1), + METRICS_VALUE(2), + SOURCE(3), + TIMESTAMP(4); + + private final int index; + + MetricColumns(int index) { + this.index = index; + } + + public int index() { + return index; + } +} \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java index ed90a3278510..fadeb94196e9 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java @@ -19,6 +19,7 @@ List readMetrics() { BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String line; + String CREATED_BY = "admin"; List metrics = new ArrayList<>(); boolean isFirstLine = true; try { @@ -40,7 +41,7 @@ List readMetrics() { logger.info("Read metric: deviceId={}, metricsName={}, createTime={}, metricsValue={}, source={}", deviceId, metricsName, createTime, metricsValue, source); - Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime); + Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime, CREATED_BY); //add the metric to the list metrics.add(metric); } diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java index 91684f1febf9..ef907f21dc90 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java @@ -22,6 +22,7 @@ public static Identifier createTable(Catalog catalog) throws Exception { .column("metrics_value", DataTypes.DOUBLE()) .column("source", DataTypes.STRING()) .column("create_time", DataTypes.TIMESTAMP(3)) + .column("created_by", DataTypes.STRING()) .primaryKey("device_id", "metrics_name", "create_time") .build(); diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java index 4cb6fd853453..126468a6f6f9 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java @@ -10,6 +10,8 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; @@ -19,6 +21,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +47,9 @@ public static void insert(Catalog catalog, Identifier tableId, List metr BinaryString.fromString(metric.getMetricsName()), metric.getMetricsValue(), BinaryString.fromString(metric.getSource()), - timestamp - ), 0); + timestamp, + BinaryString.fromString(metric.getCreatedBy()) + ), 2); } catch (Exception e) { logger.error("Error writing metric", e); } @@ -57,30 +61,54 @@ public static void insert(Catalog catalog, Identifier tableId, List metr commit.commit(messages); } - public static List read(Catalog catalog, Identifier tableId) throws Exception { + private static Timestamp convertToTimestamp(String createTime) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter); + return Timestamp.fromLocalDateTime(localDateTime); + } + + + public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Identifier tableId, + String source, String startDate, String endDate) throws Exception { Table table = catalog.getTable(tableId); + RowType rowType = table.rowType(); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); - ReadBuilder readBuilder = table.newReadBuilder(); + int[] projection = new int[] {0, 1, 2, 3, 4}; + + Predicate sourcePredicate = predicateBuilder.equal(3, BinaryString.fromString(source)); + Predicate dateRangePredicate = predicateBuilder.between(4, convertToTimestamp(startDate), convertToTimestamp(endDate)); + Predicate predicate = PredicateBuilder.and(sourcePredicate, dateRangePredicate); + + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection); List splits = readBuilder.newScan().plan().splits(); - TableRead read = readBuilder.newRead(); + TableRead read = readBuilder.newRead().executeFilter(); RecordReader reader = read.createReader(splits); - List results = new ArrayList<>(); + List results = new ArrayList<>(); - reader.forEachRemaining(row -> { - results.add(row.toString()); + reader.forEachRemaining(internalRow -> { + String deviceId = internalRow.getString(0).toString(); + String metricsName = internalRow.getString(1).toString(); + double metricsValue = internalRow.getDouble(2); + String sourceValue = internalRow.getString(3).toString(); + Timestamp timestamp = internalRow.getTimestamp(4, 3); + String createTime = ConvertTimestampToStr(timestamp); + + Metric metric = new Metric(deviceId, metricsName, metricsValue, sourceValue, createTime, null); + logger.info("Fetched Metric: {}", metric); + results.add(metric); }); return results; } - private static Timestamp convertToTimestamp(String createTime) { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter); - return Timestamp.fromLocalDateTime(localDateTime); + private static String ConvertTimestampToStr(Timestamp timestamp) { + return timestamp.toLocalDateTime() + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); } } \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java index 12b0d021edac..4554465805c7 100644 --- a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java +++ b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java @@ -1,6 +1,6 @@ package com.baeldung.paimon; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; @@ -9,17 +9,12 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.source.ReadBuilder; -import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowKind; -import org.apache.paimon.utils.CloseableIterator; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -33,7 +28,8 @@ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class PaimonLifecycleIntegrationTest { - private static final Logger logger = LoggerFactory.getLogger(PaimonLifecycleIntegrationTest.class); + private static final Logger logger = + LoggerFactory.getLogger(PaimonLifecycleIntegrationTest.class); private Catalog catalog; private Identifier tableId; @@ -46,17 +42,35 @@ void setup(@TempDir Path tempDir) { logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); } + + @AfterAll + void cleanup() { + if (catalog != null) { + try { + if (catalog.listTables("metric_db").contains("metrics")) { + catalog.dropTable(tableId, true); + } + if (catalog.listDatabases().contains("metric_db")) { + catalog.dropDatabase("metric_db", true, true); + } + logger.info("Cleanup completed successfully"); + } catch (Exception e) { + logger.error("Error during cleanup", e); + } + } + } + @Test @Order(1) void whenCallCreateTable_thenTableCreated() throws Exception { logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); catalog = PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH); assertNotNull(catalog); - + tableId = PaimonDatabaseManager.createTable(catalog); assertTrue(catalog.listTables("metric_db").contains("metrics")); assertNotNull(tableId); - } + } @Test @Order(2) @@ -65,9 +79,20 @@ void whenCallInsertRecords_thenRecordsInserted() throws Exception { PaimonTableDataManager.insert(catalog, tableId, getMetrics()); } + @Test + @Order(3) + void whenCallReadRecords_thenRecordsRead() throws Exception { + List metrics = PaimonTableDataManager.fetchMetricsBySourceAndDateRange(catalog, + tableId, "collector", "2026-04-21 18:58:19", "2026-04-21 18:58:30"); + assertFalse(metrics.isEmpty()); + assertTrue(metrics.size() == 4); + } - /* @Test - @Order(4) */ + /* + * @Test + * + * @Order(4) + */ void whenCallUpdateRecord_thenRecordUpdated() throws Exception { Table table = catalog.getTable(tableId); @@ -89,8 +114,11 @@ void whenCallUpdateRecord_thenRecordUpdated() throws Exception { // ------------------------------------------------ // 5 Delete Record // ------------------------------------------------ -/* @Test - @Order(5) */ + /* + * @Test + * + * @Order(5) + */ void whenCallDeleteRecord_thenRecordDeleted() throws Exception { Table table = catalog.getTable(tableId); @@ -115,43 +143,7 @@ void whenCallDeleteRecord_thenRecordDeleted() throws Exception { // ------------------------------------------------ // 6 Read Records // ------------------------------------------------ - @Test - @Order(3) - void whenCallReadRecords_thenRecordsRead() throws Exception { - - Table table = catalog.getTable(tableId); - ReadBuilder readBuilder = table.newReadBuilder(); - - List splits = readBuilder.newScan().plan().splits(); - - TableRead read = readBuilder.newRead(); - - RecordReader reader = read.createReader(splits); - - //List rows = new ArrayList<>(); - CloseableIterator iterator = reader.toCloseableIterator(); - while(iterator.hasNext()) { - InternalRow row = iterator.next(); - BinaryString deviceId = row.getString(0); - logger.info("Read record with deviceId: {}", deviceId); - //rows.add(row.toString()); - } - } - - // ------------------------------------------------ - // 7 Cleanup - // ------------------------------------------------ - @Test - @Order(4) - void whenCallCleanup_thenResourcesCleanedUp() throws Exception { - - assertDoesNotThrow(() -> catalog.dropTable(tableId, true), "Table does not exist"); - - assertDoesNotThrow(() -> catalog.dropDatabase("metric_db", true, true), "Database does not exist"); - - logger.info("Cleanup completed successfully"); - } private List getMetrics() { return new MetricReader().readMetrics(); @@ -160,4 +152,4 @@ private List getMetrics() { private String getWarehousePath(Path tempDir) { return "file:" + tempDir.toString(); } -} \ No newline at end of file +} From 4ff97c74cd982230f885b815bc3583e0d4701443 Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Tue, 19 May 2026 21:36:48 +0530 Subject: [PATCH 4/7] delete and update methods added --- .../main/java/com/baeldung/paimon/Metric.java | 14 +- .../com/baeldung/paimon/MetricReader.java | 2 +- .../paimon/PaimonDatabaseManager.java | 1 + .../paimon/PaimonTableDataManager.java | 124 +++++++++++++++--- .../src/main/resources/metrics-datamodel.puml | 4 +- .../PaimonLifecycleIntegrationTest.java | 93 +++++-------- 6 files changed, 152 insertions(+), 86 deletions(-) diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java index 484b9455ce10..eab4d7bcc270 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java @@ -5,19 +5,21 @@ public class Metric { private String metricsName; private double metricsValue; private String source; + private String state = "active"; private String createTime; private String createdBy; public Metric() { } - public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime, String createdBy) { + public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime, String createdBy, String state) { this.deviceId = deviceId; this.metricsName = metricsName; this.metricsValue = metricsValue; this.source = source; this.createTime = createTime; this.createdBy = createdBy; + this.state = state; } public String getCreatedBy() { @@ -69,9 +71,17 @@ public void setCreateTime(String createTime) { this.createTime = createTime; } + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + @Override public String toString() { return "Metric [deviceId=" + deviceId + ", metricsName=" + metricsName + ", metricsValue=" - + metricsValue + ", source=" + source + ", createTime=" + createTime + "]"; + + metricsValue + ", source=" + source + ", createTime=" + createTime + ", state=" + state + "]"; } } \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java index fadeb94196e9..70714feb7755 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java @@ -41,7 +41,7 @@ List readMetrics() { logger.info("Read metric: deviceId={}, metricsName={}, createTime={}, metricsValue={}, source={}", deviceId, metricsName, createTime, metricsValue, source); - Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime, CREATED_BY); + Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime, CREATED_BY, "active"); //add the metric to the list metrics.add(metric); } diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java index ef907f21dc90..29d1ec3cd2e3 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java @@ -23,6 +23,7 @@ public static Identifier createTable(Catalog catalog) throws Exception { .column("source", DataTypes.STRING()) .column("create_time", DataTypes.TIMESTAMP(3)) .column("created_by", DataTypes.STRING()) + .column("state", DataTypes.STRING()) .primaryKey("device_id", "metrics_name", "create_time") .build(); diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java index 126468a6f6f9..f5e504103e00 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java @@ -21,6 +21,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,35 +39,57 @@ public static void insert(Catalog catalog, Identifier tableId, List metr BatchTableWrite write = builder.newWrite(); metrics.forEach(metric -> { - try { - - Timestamp timestamp = convertToTimestamp(metric.getCreateTime()); - //device_id,metric_name,create_time,metric_value,source - write.write(GenericRow.of( - BinaryString.fromString(metric.getDeviceId()), - BinaryString.fromString(metric.getMetricsName()), - metric.getMetricsValue(), - BinaryString.fromString(metric.getSource()), - timestamp, - BinaryString.fromString(metric.getCreatedBy()) - ), 2); + try { + GenericRow row = createGenericRow(metric); + write.write(row, 0); } catch (Exception e) { logger.error("Error writing metric", e); } }); List messages = write.prepareCommit(); - BatchTableCommit commit = builder.newCommit(); commit.commit(messages); } - private static Timestamp convertToTimestamp(String createTime) { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter); - return Timestamp.fromLocalDateTime(localDateTime); + public static void updateMetricStateByDeviceIdMetricNameAndCreatedDate(Catalog catalog, + Identifier tableId, String deviceId, String metricName, String newState, String createdDate) + throws Exception { + + Metric metric = fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, deviceId, metricName, createdDate); + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + metric.setState(newState); + write.write(createGenericRow(metric),0); + + List messages = write.prepareCommit(); + + BatchTableCommit commit = builder.newCommit(); + commit.commit(messages); } + public static void deleteRecordsByDeviceIdMetricNameAndCreatedDate(Catalog catalog, Identifier tableId, + String deviceId, String metricsName, String createdDate) throws Exception { + Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, deviceId, metricsName, createdDate); + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + + GenericRow deleteRow = createGenericRow(metric); + + deleteRow.setRowKind(RowKind.DELETE); + + write.write(deleteRow, 0); + + List messages = write.prepareCommit(); + + builder.newCommit().commit(messages); + } public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Identifier tableId, String source, String startDate, String endDate) throws Exception { @@ -75,11 +98,11 @@ public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Ide RowType rowType = table.rowType(); PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); - int[] projection = new int[] {0, 1, 2, 3, 4}; + int[] projection = new int[] {0, 1, 2, 3, 4, 6}; Predicate sourcePredicate = predicateBuilder.equal(3, BinaryString.fromString(source)); Predicate dateRangePredicate = predicateBuilder.between(4, convertToTimestamp(startDate), convertToTimestamp(endDate)); - Predicate predicate = PredicateBuilder.and(sourcePredicate, dateRangePredicate); + Predicate predicate = PredicateBuilder.and(sourcePredicate, dateRangePredicate); ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection); @@ -98,8 +121,9 @@ public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Ide String sourceValue = internalRow.getString(3).toString(); Timestamp timestamp = internalRow.getTimestamp(4, 3); String createTime = ConvertTimestampToStr(timestamp); + String state = internalRow.getString(5).toString(); - Metric metric = new Metric(deviceId, metricsName, metricsValue, sourceValue, createTime, null); + Metric metric = new Metric(deviceId, metricsName, metricsValue, sourceValue, createTime, null, state); logger.info("Fetched Metric: {}", metric); results.add(metric); }); @@ -107,8 +131,66 @@ public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Ide return results; } + public static Metric fetchMetricByDeviceIdMetricNameAndCreatedDate(Catalog catalog, Identifier tableId, + String deviceId, String metricsName, String createdDate) throws Exception { + Table table = catalog.getTable(tableId); + RowType rowType = table.rowType(); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + + int[] projection = new int[] {0, 1, 2, 3, 4, 5, 6}; + + Predicate devicePredicate = predicateBuilder.equal(0, BinaryString.fromString(deviceId)); + Predicate metricPredicate = predicateBuilder.equal(1, BinaryString.fromString(metricsName)); + Predicate datePredicate = predicateBuilder.equal(4, convertToTimestamp(createdDate)); + Predicate predicate = PredicateBuilder.and(devicePredicate, metricPredicate, datePredicate); + + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection); + + List splits = readBuilder.newScan().plan().splits(); + + TableRead read = readBuilder.newRead().executeFilter(); + + RecordReader reader = read.createReader(splits); + List results = new ArrayList<>(); + + reader.forEachRemaining( internalRow -> { + String deviceIdValue = internalRow.getString(0).toString(); + String metricsNameValue = internalRow.getString(1).toString(); + double metricsValue = internalRow.getDouble(2); + String sourceValue = internalRow.getString(3).toString(); + Timestamp timestamp = internalRow.getTimestamp(4, 3); + String createTime = ConvertTimestampToStr(timestamp); + String createdBy = internalRow.getString(5).toString(); + String state = internalRow.getString(6).toString(); + Metric metric = new Metric(deviceIdValue, metricsNameValue, metricsValue, sourceValue, createTime, createdBy, state); + results.add(metric); + logger.info("Fetched Metric: {}", metric); + }); + + return results.isEmpty() ? null : results.get(0); + } + private static String ConvertTimestampToStr(Timestamp timestamp) { return timestamp.toLocalDateTime() - .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + } + + private static Timestamp convertToTimestamp(String createTime) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter); + return Timestamp.fromLocalDateTime(localDateTime); + } + + private static GenericRow createGenericRow(Metric metric) { + GenericRow row = GenericRow.of( + BinaryString.fromString(metric.getDeviceId()), + BinaryString.fromString(metric.getMetricsName()), + metric.getMetricsValue(), + BinaryString.fromString(metric.getSource()), + convertToTimestamp(metric.getCreateTime()), + BinaryString.fromString(metric.getCreatedBy()), + BinaryString.fromString(metric.getState()) + ); + return row; } } \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml index 182dcca72e9a..17938a26667f 100644 --- a/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml +++ b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml @@ -1,4 +1,4 @@ -@startuml +@startuml metrics-datamodel !theme crt-amber entity "metrics (Paimon Table)" as metrics { @@ -8,6 +8,8 @@ entity "metrics (Paimon Table)" as metrics { -- metric_value : DOUBLE source : STRING + created_by : STRING + state : STRING } note right of metrics diff --git a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java index 4554465805c7..4009e1386b35 100644 --- a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java +++ b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java @@ -1,5 +1,6 @@ package com.baeldung.paimon; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -7,13 +8,6 @@ import java.util.List; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.table.Table; -import org.apache.paimon.table.sink.BatchTableWrite; -import org.apache.paimon.table.sink.BatchWriteBuilder; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.types.RowKind; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; @@ -42,7 +36,6 @@ void setup(@TempDir Path tempDir) { logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); } - @AfterAll void cleanup() { if (catalog != null) { @@ -79,7 +72,7 @@ void whenCallInsertRecords_thenRecordsInserted() throws Exception { PaimonTableDataManager.insert(catalog, tableId, getMetrics()); } - @Test + @Test @Order(3) void whenCallReadRecords_thenRecordsRead() throws Exception { List metrics = PaimonTableDataManager.fetchMetricsBySourceAndDateRange(catalog, @@ -88,63 +81,41 @@ void whenCallReadRecords_thenRecordsRead() throws Exception { assertTrue(metrics.size() == 4); } - /* - * @Test - * - * @Order(4) - */ - void whenCallUpdateRecord_thenRecordUpdated() throws Exception { - - Table table = catalog.getTable(tableId); - - BatchWriteBuilder builder = table.newBatchWriteBuilder(); - - BatchTableWrite write = builder.newWrite(); - - // update Alice age - write.write(GenericRow.of(BinaryString.fromString("Alice"), 35)); - - List messages = write.prepareCommit(); - - builder.newCommit().commit(messages); - - assertTrue(messages.size() > 0); - } - - // ------------------------------------------------ - // 5 Delete Record - // ------------------------------------------------ - /* - * @Test - * - * @Order(5) - */ + @Test + @Order(4) void whenCallDeleteRecord_thenRecordDeleted() throws Exception { + Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); + assertNotNull(metric); - Table table = catalog.getTable(tableId); + PaimonTableDataManager.deleteRecordsByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); - BatchWriteBuilder builder = table.newBatchWriteBuilder(); - - BatchTableWrite write = builder.newWrite(); - - GenericRow deleteRow = GenericRow.of(BinaryString.fromString("Bob"), 0); - - deleteRow.setRowKind(RowKind.DELETE); - - write.write(deleteRow); - - List messages = write.prepareCommit(); - - builder.newCommit().commit(messages); - - assertTrue(messages.size() > 0); + metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); + assertTrue(metric == null); } - // ------------------------------------------------ - // 6 Read Records - // ------------------------------------------------ - - + @Test + @Order(5) + void whenCallUpdateRecord_thenRecordUpdated() throws Exception { + Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, + "dev_137", "cpu_usage", + "2026-04-21 18:58:27" + ); + assertNotNull(metric); + + assertEquals("active", metric.getState()); + + PaimonTableDataManager.updateMetricStateByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_137", + "cpu_usage", "inactive", + "2026-04-21 18:58:27" + ); + + metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_137", "cpu_usage", "2026-04-21 18:58:27"); + assertNotNull(metric); + assertEquals("inactive", metric.getState()); + } + private List getMetrics() { return new MetricReader().readMetrics(); } From ec363e346bb592c7285cf41745d4762a6ad2214b Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Fri, 29 May 2026 15:31:57 +0530 Subject: [PATCH 5/7] final changes --- persistence-modules/apache-paimon/pom.xml | 2 +- .../java/com/baeldung/paimon/MetricReader.java | 2 -- .../baeldung/paimon/PaimonTableDataManager.java | 2 +- .../paimon/PaimonLifecycleIntegrationTest.java | 15 ++++++++++++--- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/persistence-modules/apache-paimon/pom.xml b/persistence-modules/apache-paimon/pom.xml index 0af372660f42..1286d239096f 100644 --- a/persistence-modules/apache-paimon/pom.xml +++ b/persistence-modules/apache-paimon/pom.xml @@ -51,7 +51,7 @@ - 1.3.1 + 1.4.1 3.4.3 17 diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java index 70714feb7755..5612fdde4c11 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java @@ -13,7 +13,6 @@ public class MetricReader { Logger logger = LoggerFactory.getLogger(MetricReader.class); List readMetrics() { - //read the metrics from the file ClassLoader classLoader = getClass().getClassLoader(); InputStream inputStream = classLoader.getResourceAsStream("metrics.out"); @@ -42,7 +41,6 @@ List readMetrics() { deviceId, metricsName, createTime, metricsValue, source); Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime, CREATED_BY, "active"); - //add the metric to the list metrics.add(metric); } } diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java index f5e504103e00..b67c88de6ec3 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java @@ -63,7 +63,7 @@ public static void updateMetricStateByDeviceIdMetricNameAndCreatedDate(Catalog c BatchTableWrite write = builder.newWrite(); metric.setState(newState); - write.write(createGenericRow(metric),0); + write.write(createGenericRow(metric), 0); List messages = write.prepareCommit(); diff --git a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java index 4009e1386b35..d8908c1e81ed 100644 --- a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java +++ b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java @@ -87,9 +87,15 @@ void whenCallDeleteRecord_thenRecordDeleted() throws Exception { Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); assertNotNull(metric); - PaimonTableDataManager.deleteRecordsByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); + PaimonTableDataManager.deleteRecordsByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_136", + "disk_io", "2026-04-21 18:58:26" + ); - metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); + metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_136", + "disk_io", "2026-04-21 18:58:26" + ); assertTrue(metric == null); } @@ -111,7 +117,10 @@ void whenCallUpdateRecord_thenRecordUpdated() throws Exception { "2026-04-21 18:58:27" ); - metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_137", "cpu_usage", "2026-04-21 18:58:27"); + metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_137", + "cpu_usage", "2026-04-21 18:58:27" + ); assertNotNull(metric); assertEquals("inactive", metric.getState()); } From a39d6953bc8a09b22059fba53c6231bd3cbead34 Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Fri, 29 May 2026 18:59:30 +0530 Subject: [PATCH 6/7] deleted java file --- .../com/baeldung/paimon/MetricColumns.java | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java deleted file mode 100644 index 954aad16713c..000000000000 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricColumns.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.baeldung.paimon; - -enum MetricColumns { - DEVICE_ID(0), - METRICS_NAME(1), - METRICS_VALUE(2), - SOURCE(3), - TIMESTAMP(4); - - private final int index; - - MetricColumns(int index) { - this.index = index; - } - - public int index() { - return index; - } -} \ No newline at end of file From 6a039cb70c3f15fe9f2a1adfebe4e676838595f5 Mon Sep 17 00:00:00 2001 From: parthiv39731 Date: Fri, 29 May 2026 19:02:15 +0530 Subject: [PATCH 7/7] fixed formatting --- .../java/com/baeldung/paimon/PaimonDatabaseManager.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java index 29d1ec3cd2e3..a0ae92610712 100644 --- a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java @@ -28,10 +28,8 @@ public static Identifier createTable(Catalog catalog) throws Exception { .build(); Identifier tableId = Identifier.create("metric_db", "metrics"); - catalog.createDatabase("metric_db", false); - - - + + catalog.createDatabase("metric_db", false); catalog.createTable(tableId, schema, false); return tableId; }