diff --git a/persistence-modules/apache-paimon/pom.xml b/persistence-modules/apache-paimon/pom.xml new file mode 100644 index 000000000000..1286d239096f --- /dev/null +++ b/persistence-modules/apache-paimon/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + com.baeldung + persistence-modules + 1.0.0-SNAPSHOT + + com.baeldung + apache-paimon + 1.0-SNAPSHOT + apache-paimon + + + + org.apache.paimon + paimon-bundle + ${paimon.version} + + + org.slf4j + slf4j-api + + + + + + + org.apache.hadoop + hadoop-client-runtime + runtime + ${hadoop.version} + + + org.slf4j + slf4j-api + + + org.xerial.snappy + snappy-java + + + com.google.code.findbugs + jsr305 + + + + + + + 1.4.1 + 3.4.3 + 17 + + \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java new file mode 100644 index 000000000000..eab4d7bcc270 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/Metric.java @@ -0,0 +1,87 @@ +package com.baeldung.paimon; + +public class Metric { + private String deviceId; + private String metricsName; + private double metricsValue; + private String source; + private String state = "active"; + private String createTime; + private String createdBy; + + public Metric() { + } + + public Metric(String deviceId, String metricsName, double metricsValue, String source, String createTime, String createdBy, String state) { + this.deviceId = deviceId; + this.metricsName = metricsName; + this.metricsValue = metricsValue; + this.source = source; + this.createTime = createTime; + this.createdBy = createdBy; + this.state = state; + } + + public String getCreatedBy() { + return createdBy; + } + + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public String getMetricsName() { + return metricsName; + } + + public void setMetricsName(String metricsName) { + this.metricsName = metricsName; + } + + public double getMetricsValue() { + return metricsValue; + } + + public void setMetricsValue(double metricsValue) { + this.metricsValue = metricsValue; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public String getCreateTime() { + return createTime; + } + + public void setCreateTime(String createTime) { + this.createTime = createTime; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + @Override + public String toString() { + return "Metric [deviceId=" + deviceId + ", metricsName=" + metricsName + ", metricsValue=" + + metricsValue + ", source=" + source + ", createTime=" + createTime + ", state=" + state + "]"; + } +} \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java new file mode 100644 index 000000000000..5612fdde4c11 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/MetricReader.java @@ -0,0 +1,53 @@ +package com.baeldung.paimon; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricReader { + + Logger logger = LoggerFactory.getLogger(MetricReader.class); + + List readMetrics() { + ClassLoader classLoader = getClass().getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("metrics.out"); + + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + String CREATED_BY = "admin"; + List metrics = new ArrayList<>(); + boolean isFirstLine = true; + try { + while ((line = reader.readLine()) != null) { + if (isFirstLine) { + isFirstLine = false; + continue; + } + String[] parts = line.split(","); + if (parts.length == 5) { + String deviceId = parts[0]; + String metricsName = parts[1]; + + + String createTime = parts[2]; + + long metricsValue = (long) Double.parseDouble(parts[3]); + String source = parts[4]; + logger.info("Read metric: deviceId={}, metricsName={}, createTime={}, metricsValue={}, source={}", + deviceId, metricsName, createTime, metricsValue, source); + + Metric metric = new Metric(deviceId, metricsName, metricsValue, source, createTime, CREATED_BY, "active"); + metrics.add(metric); + } + } + } catch (Exception e) { + logger.error("Error reading metrics", e); + } + + return metrics; + } +} diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java new file mode 100644 index 000000000000..a0ae92610712 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonDatabaseManager.java @@ -0,0 +1,36 @@ +package com.baeldung.paimon; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataTypes; + +public class PaimonDatabaseManager { + public static Catalog createCatalog(String warehousePath) { + CatalogContext context = CatalogContext.create(new Path(warehousePath)); + return CatalogFactory.createCatalog(context); + } + + public static Identifier createTable(Catalog catalog) throws Exception { + + Schema schema = Schema.newBuilder() + .column("device_id", DataTypes.STRING()) + .column("metrics_name", DataTypes.STRING()) + .column("metrics_value", DataTypes.DOUBLE()) + .column("source", DataTypes.STRING()) + .column("create_time", DataTypes.TIMESTAMP(3)) + .column("created_by", DataTypes.STRING()) + .column("state", DataTypes.STRING()) + .primaryKey("device_id", "metrics_name", "create_time") + .build(); + + Identifier tableId = Identifier.create("metric_db", "metrics"); + + catalog.createDatabase("metric_db", false); + catalog.createTable(tableId, schema, false); + return tableId; + } +} diff --git a/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java new file mode 100644 index 000000000000..b67c88de6ec3 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/java/com/baeldung/paimon/PaimonTableDataManager.java @@ -0,0 +1,196 @@ +package com.baeldung.paimon; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaimonTableDataManager { + + final static Logger logger = LoggerFactory.getLogger(PaimonTableDataManager.class); + + public static void insert(Catalog catalog, Identifier tableId, List metrics) throws Exception { + + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + + metrics.forEach(metric -> { + try { + GenericRow row = createGenericRow(metric); + write.write(row, 0); + } catch (Exception e) { + logger.error("Error writing metric", e); + } + }); + + List messages = write.prepareCommit(); + BatchTableCommit commit = builder.newCommit(); + commit.commit(messages); + } + + public static void updateMetricStateByDeviceIdMetricNameAndCreatedDate(Catalog catalog, + Identifier tableId, String deviceId, String metricName, String newState, String createdDate) + throws Exception { + + Metric metric = fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, deviceId, metricName, createdDate); + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + metric.setState(newState); + write.write(createGenericRow(metric), 0); + + List messages = write.prepareCommit(); + + BatchTableCommit commit = builder.newCommit(); + commit.commit(messages); + } + + public static void deleteRecordsByDeviceIdMetricNameAndCreatedDate(Catalog catalog, Identifier tableId, + String deviceId, String metricsName, String createdDate) throws Exception { + Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, deviceId, metricsName, createdDate); + Table table = catalog.getTable(tableId); + + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + + BatchTableWrite write = builder.newWrite(); + + GenericRow deleteRow = createGenericRow(metric); + + deleteRow.setRowKind(RowKind.DELETE); + + write.write(deleteRow, 0); + + List messages = write.prepareCommit(); + + builder.newCommit().commit(messages); + } + + public static List fetchMetricsBySourceAndDateRange(Catalog catalog, Identifier tableId, + String source, String startDate, String endDate) throws Exception { + + Table table = catalog.getTable(tableId); + RowType rowType = table.rowType(); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + + int[] projection = new int[] {0, 1, 2, 3, 4, 6}; + + Predicate sourcePredicate = predicateBuilder.equal(3, BinaryString.fromString(source)); + Predicate dateRangePredicate = predicateBuilder.between(4, convertToTimestamp(startDate), convertToTimestamp(endDate)); + Predicate predicate = PredicateBuilder.and(sourcePredicate, dateRangePredicate); + + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection); + + List splits = readBuilder.newScan().plan().splits(); + + TableRead read = readBuilder.newRead().executeFilter(); + + RecordReader reader = read.createReader(splits); + + List results = new ArrayList<>(); + + reader.forEachRemaining(internalRow -> { + String deviceId = internalRow.getString(0).toString(); + String metricsName = internalRow.getString(1).toString(); + double metricsValue = internalRow.getDouble(2); + String sourceValue = internalRow.getString(3).toString(); + Timestamp timestamp = internalRow.getTimestamp(4, 3); + String createTime = ConvertTimestampToStr(timestamp); + String state = internalRow.getString(5).toString(); + + Metric metric = new Metric(deviceId, metricsName, metricsValue, sourceValue, createTime, null, state); + logger.info("Fetched Metric: {}", metric); + results.add(metric); + }); + + return results; + } + + public static Metric fetchMetricByDeviceIdMetricNameAndCreatedDate(Catalog catalog, Identifier tableId, + String deviceId, String metricsName, String createdDate) throws Exception { + Table table = catalog.getTable(tableId); + RowType rowType = table.rowType(); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + + int[] projection = new int[] {0, 1, 2, 3, 4, 5, 6}; + + Predicate devicePredicate = predicateBuilder.equal(0, BinaryString.fromString(deviceId)); + Predicate metricPredicate = predicateBuilder.equal(1, BinaryString.fromString(metricsName)); + Predicate datePredicate = predicateBuilder.equal(4, convertToTimestamp(createdDate)); + Predicate predicate = PredicateBuilder.and(devicePredicate, metricPredicate, datePredicate); + + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection); + + List splits = readBuilder.newScan().plan().splits(); + + TableRead read = readBuilder.newRead().executeFilter(); + + RecordReader reader = read.createReader(splits); + List results = new ArrayList<>(); + + reader.forEachRemaining( internalRow -> { + String deviceIdValue = internalRow.getString(0).toString(); + String metricsNameValue = internalRow.getString(1).toString(); + double metricsValue = internalRow.getDouble(2); + String sourceValue = internalRow.getString(3).toString(); + Timestamp timestamp = internalRow.getTimestamp(4, 3); + String createTime = ConvertTimestampToStr(timestamp); + String createdBy = internalRow.getString(5).toString(); + String state = internalRow.getString(6).toString(); + Metric metric = new Metric(deviceIdValue, metricsNameValue, metricsValue, sourceValue, createTime, createdBy, state); + results.add(metric); + logger.info("Fetched Metric: {}", metric); + }); + + return results.isEmpty() ? null : results.get(0); + } + + private static String ConvertTimestampToStr(Timestamp timestamp) { + return timestamp.toLocalDateTime() + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + } + + private static Timestamp convertToTimestamp(String createTime) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.parse(createTime, formatter); + return Timestamp.fromLocalDateTime(localDateTime); + } + + private static GenericRow createGenericRow(Metric metric) { + GenericRow row = GenericRow.of( + BinaryString.fromString(metric.getDeviceId()), + BinaryString.fromString(metric.getMetricsName()), + metric.getMetricsValue(), + BinaryString.fromString(metric.getSource()), + convertToTimestamp(metric.getCreateTime()), + BinaryString.fromString(metric.getCreatedBy()), + BinaryString.fromString(metric.getState()) + ); + return row; + } +} \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml new file mode 100644 index 000000000000..17938a26667f --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/resources/metrics-datamodel.puml @@ -0,0 +1,24 @@ +@startuml metrics-datamodel +!theme crt-amber + +entity "metrics (Paimon Table)" as metrics { + + device_id : STRING <> + + metric_name : STRING <> + + create_time : TIMESTAMP(3) <> + -- + metric_value : DOUBLE + source : STRING + created_by : STRING + state : STRING +} + +note right of metrics +Primary Key: (device_id, metric_name, create_time) + +Optimized for: +- High-ingest time-series data +- Device-level analytics +- Streaming upserts +end note + +@enduml \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/main/resources/metrics.out b/persistence-modules/apache-paimon/src/main/resources/metrics.out new file mode 100644 index 000000000000..e9944f054339 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/resources/metrics.out @@ -0,0 +1,91 @@ +device_id,metric_name,create_time,metric_value,source +dev_101,cpu_usage,2026-04-21 18:57:01,72.5,agent +dev_102,cpu_usage,2026-04-21 18:57:02,65.1,agent +dev_103,memory_usage,2026-04-21 18:57:03,58.3,agent +dev_104,disk_io,2026-04-21 18:57:04,120.7,collector +dev_105,cpu_usage,2026-04-21 18:57:05,81.2,agent +dev_101,memory_usage,2026-04-21 18:57:06,63.4,agent +dev_102,disk_io,2026-04-21 18:57:07,95.6,collector +dev_103,cpu_usage,2026-04-21 18:57:08,49.8,agent +dev_104,memory_usage,2026-04-21 18:57:09,70.2,agent +dev_105,disk_io,2026-04-21 18:57:10,110.1,collector +dev_101,cpu_usage,2026-04-21 18:57:11,74.0,agent +dev_102,memory_usage,2026-04-21 18:57:12,60.9,agent +dev_103,disk_io,2026-04-21 18:57:13,88.2,collector +dev_104,cpu_usage,2026-04-21 18:57:14,66.7,agent +dev_105,memory_usage,2026-04-21 18:57:15,75.3,agent +dev_101,disk_io,2026-04-21 18:57:16,130.5,collector +dev_102,cpu_usage,2026-04-21 18:57:17,69.2,agent +dev_103,memory_usage,2026-04-21 18:57:18,55.1,agent +dev_104,disk_io,2026-04-21 18:57:19,102.4,collector +dev_105,cpu_usage,2026-04-21 18:57:20,83.6,agent +dev_106,cpu_usage,2026-04-21 18:57:21,61.3,agent +dev_107,memory_usage,2026-04-21 18:57:22,67.8,agent +dev_108,disk_io,2026-04-21 18:57:23,91.0,collector +dev_109,cpu_usage,2026-04-21 18:57:24,77.5,agent +dev_110,memory_usage,2026-04-21 18:57:25,62.6,agent +dev_106,disk_io,2026-04-21 18:57:26,105.3,collector +dev_107,cpu_usage,2026-04-21 18:57:27,71.4,agent +dev_108,memory_usage,2026-04-21 18:57:28,59.7,agent +dev_109,disk_io,2026-04-21 18:57:29,98.8,collector +dev_110,cpu_usage,2026-04-21 18:57:30,64.9,agent +dev_111,cpu_usage,2026-04-21 18:57:31,73.3,agent +dev_112,memory_usage,2026-04-21 18:57:32,68.1,agent +dev_113,disk_io,2026-04-21 18:57:33,115.9,collector +dev_114,cpu_usage,2026-04-21 18:57:34,79.0,agent +dev_115,memory_usage,2026-04-21 18:57:35,57.6,agent +dev_111,disk_io,2026-04-21 18:57:36,122.0,collector +dev_112,cpu_usage,2026-04-21 18:57:37,69.5,agent +dev_113,memory_usage,2026-04-21 18:57:38,61.2,agent +dev_114,disk_io,2026-04-21 18:57:39,99.1,collector +dev_115,cpu_usage,2026-04-21 18:57:40,82.7,agent +dev_116,cpu_usage,2026-04-21 18:57:41,66.0,agent +dev_117,memory_usage,2026-04-21 18:57:42,72.4,agent +dev_118,disk_io,2026-04-21 18:57:43,87.5,collector +dev_119,cpu_usage,2026-04-21 18:57:44,75.8,agent +dev_120,memory_usage,2026-04-21 18:57:45,63.0,agent +dev_116,disk_io,2026-04-21 18:57:46,108.7,collector +dev_117,cpu_usage,2026-04-21 18:57:47,70.6,agent +dev_118,memory_usage,2026-04-21 18:57:48,58.9,agent +dev_119,disk_io,2026-04-21 18:57:49,93.2,collector +dev_120,cpu_usage,2026-04-21 18:57:50,67.4,agent +dev_121,cpu_usage,2026-04-21 18:57:51,78.2,agent +dev_122,memory_usage,2026-04-21 18:57:52,64.8,agent +dev_123,disk_io,2026-04-21 18:57:53,101.6,collector +dev_124,cpu_usage,2026-04-21 18:57:54,72.9,agent +dev_125,memory_usage,2026-04-21 18:57:55,60.5,agent +dev_121,disk_io,2026-04-21 18:57:56,118.3,collector +dev_122,cpu_usage,2026-04-21 18:57:57,68.7,agent +dev_123,memory_usage,2026-04-21 18:57:58,56.2,agent +dev_124,disk_io,2026-04-21 18:57:59,97.0,collector +dev_125,cpu_usage,2026-04-21 18:58:00,80.1,agent +dev_126,cpu_usage,2026-04-21 18:58:01,69.9,agent +dev_127,memory_usage,2026-04-21 18:58:02,66.3,agent +dev_128,disk_io,2026-04-21 18:58:03,92.7,collector +dev_129,cpu_usage,2026-04-21 18:58:04,76.4,agent +dev_130,memory_usage,2026-04-21 18:58:05,61.7,agent +dev_126,disk_io,2026-04-21 18:58:06,109.9,collector +dev_127,cpu_usage,2026-04-21 18:58:07,73.1,agent +dev_128,memory_usage,2026-04-21 18:58:08,59.4,agent +dev_129,disk_io,2026-04-21 18:58:09,94.6,collector +dev_130,cpu_usage,2026-04-21 18:58:10,68.2,agent +dev_131,cpu_usage,2026-04-21 18:58:11,77.0,agent +dev_132,memory_usage,2026-04-21 18:58:12,65.9,agent +dev_133,disk_io,2026-04-21 18:58:13,103.8,collector +dev_134,cpu_usage,2026-04-21 18:58:14,74.6,agent +dev_135,memory_usage,2026-04-21 18:58:15,62.1,agent +dev_131,disk_io,2026-04-21 18:58:16,117.5,collector +dev_132,cpu_usage,2026-04-21 18:58:17,70.8,agent +dev_133,memory_usage,2026-04-21 18:58:18,57.3,agent +dev_134,disk_io,2026-04-21 18:58:19,98.2,collector +dev_135,cpu_usage,2026-04-21 18:58:20,81.5,agent +dev_136,cpu_usage,2026-04-21 18:58:21,67.6,agent +dev_137,memory_usage,2026-04-21 18:58:22,63.8,agent +dev_138,disk_io,2026-04-21 18:58:23,90.4,collector +dev_139,cpu_usage,2026-04-21 18:58:24,75.2,agent +dev_140,memory_usage,2026-04-21 18:58:25,60.0,agent +dev_136,disk_io,2026-04-21 18:58:26,106.1,collector +dev_137,cpu_usage,2026-04-21 18:58:27,72.0,agent +dev_138,memory_usage,2026-04-21 18:58:28,58.7,agent +dev_139,disk_io,2026-04-21 18:58:29,95.9,collector +dev_140,cpu_usage,2026-04-21 18:58:30,69.3,agent diff --git a/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml b/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml new file mode 100644 index 000000000000..90ea39ca0454 --- /dev/null +++ b/persistence-modules/apache-paimon/src/main/resources/paimon-table-creation.puml @@ -0,0 +1,49 @@ +@startuml + + interface Catalog { + +createDatabase(String name, boolean ignoreIfExists) + +dropDatabase(String name, boolean ignoreIfExists, boolean cascade) + +databaseExists(String name): boolean + +listDatabases(): List + +createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + +getTable(Identifier identifier): Table + +dropTable(Identifier identifier, boolean ignoreIfExists) + +renameTable(Identifier from, Identifier to, boolean ignoreIfExists) + +alterTable(Identifier identifier, List changes, boolean ignoreIfExists) + } + + interface Table { + +name(): String + +rowType(): RowType + +partitionKeys(): List + +primaryKeys(): List + +options(): Map + +comment(): Optional + +copy(Map dynamicOptions): Table + } + + class Schema { + -fields: List + -partitionKeys: List + -primaryKeys: List + -options: Map + } + + class DataField { + +id(): int + +name(): String + +type(): DataType + +description(): String + } + + interface InternalRow { + +getField(int pos): Object + } + + + Catalog ..> Table : creates/retrieves + Table --> Schema : defined by + Schema "1" *-- "many" DataField : contains + Table ..> InternalRow : represents records as + +@enduml \ No newline at end of file diff --git a/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java new file mode 100644 index 000000000000..d8908c1e81ed --- /dev/null +++ b/persistence-modules/apache-paimon/src/test/java/com/baeldung/paimon/PaimonLifecycleIntegrationTest.java @@ -0,0 +1,135 @@ +package com.baeldung.paimon; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.nio.file.Path; +import java.util.List; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class PaimonLifecycleIntegrationTest { + private static final Logger logger = + LoggerFactory.getLogger(PaimonLifecycleIntegrationTest.class); + + private Catalog catalog; + private Identifier tableId; + + private String WAREHOUSE_PATH; + + @BeforeAll + void setup(@TempDir Path tempDir) { + WAREHOUSE_PATH = getWarehousePath(tempDir); + logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); + } + + @AfterAll + void cleanup() { + if (catalog != null) { + try { + if (catalog.listTables("metric_db").contains("metrics")) { + catalog.dropTable(tableId, true); + } + if (catalog.listDatabases().contains("metric_db")) { + catalog.dropDatabase("metric_db", true, true); + } + logger.info("Cleanup completed successfully"); + } catch (Exception e) { + logger.error("Error during cleanup", e); + } + } + } + + @Test + @Order(1) + void whenCallCreateTable_thenTableCreated() throws Exception { + logger.info("Warehouse path set to: {}", WAREHOUSE_PATH); + catalog = PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH); + assertNotNull(catalog); + + tableId = PaimonDatabaseManager.createTable(catalog); + assertTrue(catalog.listTables("metric_db").contains("metrics")); + assertNotNull(tableId); + } + + @Test + @Order(2) + void whenCallInsertRecords_thenRecordsInserted() throws Exception { + assertTrue(catalog.listTables("metric_db").contains("metrics")); + PaimonTableDataManager.insert(catalog, tableId, getMetrics()); + } + + @Test + @Order(3) + void whenCallReadRecords_thenRecordsRead() throws Exception { + List metrics = PaimonTableDataManager.fetchMetricsBySourceAndDateRange(catalog, + tableId, "collector", "2026-04-21 18:58:19", "2026-04-21 18:58:30"); + assertFalse(metrics.isEmpty()); + assertTrue(metrics.size() == 4); + } + + @Test + @Order(4) + void whenCallDeleteRecord_thenRecordDeleted() throws Exception { + Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"); + assertNotNull(metric); + + PaimonTableDataManager.deleteRecordsByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_136", + "disk_io", "2026-04-21 18:58:26" + ); + + metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_136", + "disk_io", "2026-04-21 18:58:26" + ); + assertTrue(metric == null); + } + + @Test + @Order(5) + void whenCallUpdateRecord_thenRecordUpdated() throws Exception { + Metric metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, + "dev_137", "cpu_usage", + "2026-04-21 18:58:27" + ); + assertNotNull(metric); + + assertEquals("active", metric.getState()); + + PaimonTableDataManager.updateMetricStateByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_137", + "cpu_usage", "inactive", + "2026-04-21 18:58:27" + ); + + metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate( + catalog, tableId, "dev_137", + "cpu_usage", "2026-04-21 18:58:27" + ); + assertNotNull(metric); + assertEquals("inactive", metric.getState()); + } + + private List getMetrics() { + return new MetricReader().readMetrics(); + } + + private String getWarehousePath(Path tempDir) { + return "file:" + tempDir.toString(); + } +} diff --git a/persistence-modules/pom.xml b/persistence-modules/pom.xml index b720cebef5a0..78bc22a4374d 100644 --- a/persistence-modules/pom.xml +++ b/persistence-modules/pom.xml @@ -18,6 +18,7 @@ apache-bookkeeper apache-cayenne apache-derby + apache-paimon atomikos blaze-persistence clickhouse