diff --git a/examples/kafka/readme.md b/examples/kafka/readme.md
index af4c3d25..765a0cba 100644
--- a/examples/kafka/readme.md
+++ b/examples/kafka/readme.md
@@ -25,51 +25,111 @@ The example is to show how to send data from localhost to IoTDB through Kafka.
```
## Usage
### Version usage
-IoTDB: 1.0.0
-Kafka: 2.8.0
+
+| | Version |
+|-------|---------|
+| IoTDB | 2.0.5 |
+| Kafka | 2.8.2 |
+
### Dependencies with Maven
```
-
- org.apache.kafka
- kafka_2.13
- 2.8.0
-
-
- org.apache.iotdb
- iotdb-session
- 1.0.0
-
+
+ org.apache.kafka
+ kafka_2.13
+ 2.8.2
+
+
+ org.apache.iotdb
+ iotdb-session
+ 2.0.5
+
```
-### Launch the servers
+### Prerequisite Steps
-```
- Before you run the program, make sure you have launched the servers of Kafka and IoTDB.
- For details, please refer to http://kafka.apache.org/081/documentation.html#quickstart
-```
+#### 1. Install IoTDB
+please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download)
-### Run Producer.java
+#### 2. Install Kafka
+please refer to [https://kafka.apache.org/downloads](https://kafka.apache.org/downloads)
-```
- The class is to send data from localhost to Kafka clusters.
- Firstly, you have to change the parameter of TOPIC in Constant.java to what you create:(for example : "Kafka-Test")
- > public final static String TOPIC = "Kafka-Test";
- The default format of data is "device,timestamp,value ". (for example : "measurement1,2017/10/24 19:30:00,60")
- Then you need to create data in Constat.ALL_DATA
- Finally, run KafkaProducer.java
-```
+#### 3. Startup IoTDB
+please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html)
-### Run Consumer.java
+#### 4. Startup Kafka
+please refer to [https://kafka.apache.org/quickstart](https://kafka.apache.org/quickstart)
+
+
+### Case 1: Send data from localhost to IoTDB-tree
+
+Files related:
+1. `Constant.java` : configuration of IoTDB and Kafka
+2. `Producer.java` : send data from localhost to Kafka cluster
+3. `Consumer.java` : consume data from Kafka cluster through multi-threads
+4. `ConsumerThread.java` : consume operations done by single thread
+
+Step 0: Set parameter in `Constant.java`
+
+> Change the parameters according to your situation.
+
+| Parameter | Data Type | Description |
+|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| TOPIC | String | The topic to store data in Kafka |
+| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. `"127.0.0.1:9092"` |
+| CONSUMER_THREAD_NUM | int | The number of consumer threads |
+| SESSION_SIZE | int | The maximum number of IoTDB sessions |
+| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` |
+| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` |
+| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` |
+| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` |
+| STORAGE_GROUP | Array | The storage groups to create |
+| CREATE_TIMESERIES | Array | The timeseries to create
Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"}
e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` |
+| ALL_DATA | Array | The data to create
Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*"
e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` |
+
+Step 1: Run `Producer.java`
+
+> This class sends data from localhost to Kafka clusters.
+
+Step 2: Run `Consumer.java`
+
+> This class consumes data from Kafka through multi-threads and sends the data to IoTDB-tree.
+
+### Case 2: Send data from localhost to IoTDB-table
+
+Files related:
+1. `RelationalConstant.java` : configuration of IoTDB and Kafka
+2. `RelationalProducer.java` : send data from localhost to Kafka cluster
+3. `RelationalConsumer.java` : consume data from Kafka cluster through multi-threads
+4. `RelationalConsumerThread.java` : consume operations done by single thread
+
+Step 0: Set parameter in `RelationalConstant.java`
+
+> Change the parameters according to your situation.
+
+| Parameter | Data Type | Description |
+|---------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| TOPIC | String | The topic to store data in Kafka |
+| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. `"127.0.0.1:9092"` |
+| CONSUMER_THREAD_NUM | int | The number of consumer threads |
+| SESSION_SIZE | int | The maximum number of IoTDB sessions |
+| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` |
+| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` |
+| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` |
+| DATABASES | Array | The databases to create |
+| TABLES | Array | The tables to create
Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"}
e.g. `{"kafka_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` |
+| ALL_DATA | Array | The data to create
Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*"
e.g. `"kafka_db1;tb1;time,status;17,true;18,false;19,true"` |
+
+Step 1: Run `RelationalProducer.java`
+
+> This class sends data from localhost to Kafka clusters.
+
+Step 2: Run `RelationalConsumer.java`
+
+> This class consumes data from Kafka through multi-threads and sends the data to IoTDB-table.
-```
- The class is to show how to consume data from kafka through multi-threads.
- The data is sent by class KafkaProducer.
- You can set the parameter of CONSUMER_THREAD_NUM in Constant.java to make sure the number of consumer threads:(for example: "3")
- > private final static int CONSUMER_THREAD_NUM = 3;
-```
-#### Notice
- If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1.
\ No newline at end of file
+### Notice
+If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1.
\ No newline at end of file
diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java
new file mode 100644
index 00000000..aebdef36
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.kafka.relational;
+
+public class RelationalConstant {
+
+ public static final String KAFKA_SERVICE_URL = "172.20.31.71:9094";
+ public static final String TOPIC = "Kafka-Relational-Test";
+ public static final String[] IOTDB_URLS = {
+ "127.0.0.1:6667"
+ };
+ public static final String IOTDB_USERNAME = "root";
+ public static final String IOTDB_PASSWORD = "root";
+ public static final int SESSION_SIZE = 3;
+ public static final int CONSUMER_THREAD_NUM = 5;
+ public static final String[] DATABASES = {"kafka_db1", "kafka_db2"};
+ public static final String[][] TABLES = {
+ // database, tableName, columnNames, columnTypes, columnCategories
+ {"kafka_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
+ {"kafka_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
+ };
+ public static final String[] ALL_DATA = {
+ // database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
+ "kafka_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
+ "kafka_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
+ "kafka_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
+ "kafka_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
+ "kafka_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
+ "kafka_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
+ "kafka_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
+ };
+}
diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java
new file mode 100644
index 00000000..3e9c37c8
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.kafka.relational;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.pool.ITableSessionPool;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class RelationalConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RelationalConsumer.class);
+ private static ITableSessionPool tableSessionPool;
+ private List> consumerList;
+
+ private RelationalConsumer(List> consumerList) {
+ this.consumerList = consumerList;
+ initSessionPool();
+ }
+
+ private static void initSessionPool() {
+ tableSessionPool =
+ new TableSessionPoolBuilder()
+ .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
+ .user(RelationalConstant.IOTDB_USERNAME)
+ .password(RelationalConstant.IOTDB_PASSWORD)
+ .maxSize(RelationalConstant.SESSION_SIZE)
+ .build();
+ }
+
+ public static void main(String[] args) {
+ List> consumerList = new ArrayList<>();
+ for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC);
+
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumerList.add(consumer);
+ consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC));
+ }
+ RelationalConsumer consumer = new RelationalConsumer(consumerList);
+ initIoTDB();
+ consumer.consumeInParallel();
+ }
+
+ private static void initIoTDB() {
+ for (String db : RelationalConstant.DATABASES) {
+ boolean res = createDatabase(db);
+ if (!res) {
+ throw new RuntimeException("Create database failed");
+ }
+ }
+ for (String[] tableInfo : RelationalConstant.TABLES) {
+ boolean res = createTable(tableInfo);
+ if (!res) {
+ throw new RuntimeException("Create table failed");
+ }
+ }
+ }
+
+ private static boolean createDatabase(String dbName) {
+ try (ITableSession session = tableSessionPool.getSession()) {
+ try {
+ session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Database Error: ", e);
+ return false;
+ }
+ } catch (IoTDBConnectionException e) {
+ LOGGER.error("Get Table Session Error: ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean createTable(String[] tableInfo) {
+ try (ITableSession session = tableSessionPool.getSession()) {
+ String sql = getCreateTableSQL(tableInfo);
+ try {
+ session.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Table Error: ", e);
+ return false;
+ }
+ } catch (IoTDBConnectionException e) {
+ LOGGER.error("Get Table Session Error: ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private static String getCreateTableSQL(String[] tableInfo) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");
+
+ String[] columnNames = tableInfo[2].split(",");
+ String[] columnTypes = tableInfo[3].split(",");
+ String[] columnCategories = tableInfo[4].split(",");
+ int columnSize = columnNames.length;
+
+ for (int i = 0; i < columnSize; i++) {
+ sql.append(columnNames[i]).append(" ");
+ sql.append(columnTypes[i]).append(" ");
+ sql.append(columnCategories[i]).append(",");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(")");
+ return sql.toString();
+ }
+
+ private void consumeInParallel() {
+ ExecutorService executor = Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM);
+ for (int i = 0; i < consumerList.size(); i++) {
+ RelationalConsumerThread consumerThread = new RelationalConsumerThread(consumerList.get(i), tableSessionPool);
+ executor.submit(consumerThread);
+ }
+ }
+}
diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java
new file mode 100644
index 00000000..073de7dc
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.kafka.relational;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.pool.ITableSessionPool;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.example.ConsumerThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RelationalConsumerThread implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerThread.class);
+ private KafkaConsumer consumer;
+ private ITableSessionPool tableSessionPool;
+
+ public RelationalConsumerThread(KafkaConsumer consumer, ITableSessionPool tableSessionPool) {
+ this.consumer = consumer;
+ this.tableSessionPool = tableSessionPool;
+ }
+
+ @Override
+ public void run() {
+ try {
+ do {
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
+ LOGGER.info("Received records: {}", records.count());
+ List dataList = new ArrayList<>(records.count());
+ for (ConsumerRecord consumerRecord : records) {
+ dataList.add(consumerRecord.value());
+ }
+ insertDataList(dataList);
+ } while (true);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+
+ private void insertDataList(List dataList) {
+ for (String s : dataList) {
+ String sql = getInsertValueSQL(s);
+
+ try (ITableSession session = tableSessionPool.getSession()) {
+ try {
+ session.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Insert Values Into Table Error: ", e);
+ }
+ } catch (IoTDBConnectionException e) {
+ LOGGER.error("Get Table Session Error: ", e);
+ }
+ }
+ }
+
+ private String getInsertValueSQL(String s) {
+ StringBuilder sql = new StringBuilder();
+ String[] curDataInfo = s.split(";");
+ int valueSetSize = curDataInfo.length - 3;
+ String database = curDataInfo[0];
+ String tableName = curDataInfo[1];
+ String columnNames = curDataInfo[2];
+ sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"(");
+ sql.append(columnNames).append(") VALUES ");
+
+ for (int j = 0; j < valueSetSize; j++) {
+ String columnValues = curDataInfo[3 + j];
+ sql.append("(");
+ sql.append(columnValues);
+ sql.append("),");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ return sql.toString();
+ }
+}
diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java
new file mode 100644
index 00000000..9bfc85d5
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.kafka.relational;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class RelationalProducer {
+
+ private final KafkaProducer kafkaProducer;
+ private static final Logger LOGGER = LoggerFactory.getLogger(RelationalProducer.class);
+
+ public RelationalProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ kafkaProducer = new KafkaProducer<>(props);
+ }
+
+ public static void main(String[] args) {
+ RelationalProducer relationalProducer = new RelationalProducer();
+ relationalProducer.produce();
+ relationalProducer.close();
+ }
+
+ private void produce() {
+ for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
+ String key = Integer.toString(i);
+ try {
+ RecordMetadata metadata = kafkaProducer.send(new ProducerRecord<>(RelationalConstant.TOPIC, key, RelationalConstant.ALL_DATA[i])).get();
+ LOGGER.info("Sent record(key={} value={}) meta(partition={}, offset={})\n", key, RelationalConstant.ALL_DATA[i], metadata.partition(), metadata.offset());
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
+
+ private void close() {
+ kafkaProducer.close();
+ }
+}
diff --git a/examples/rabbitmq/pom.xml b/examples/rabbitmq/pom.xml
index dd10aa88..4f7f1a64 100644
--- a/examples/rabbitmq/pom.xml
+++ b/examples/rabbitmq/pom.xml
@@ -42,6 +42,7 @@
com.rabbitmq
amqp-client
+ 5.26.0
diff --git a/examples/rabbitmq/readme.md b/examples/rabbitmq/readme.md
index 1a983e70..a53ddf68 100644
--- a/examples/rabbitmq/readme.md
+++ b/examples/rabbitmq/readme.md
@@ -25,9 +25,11 @@ The example is to show how to send data from localhost to IoTDB through RabbitMQ
```
## Usage
### Version usage
-IoTDB: 0.12.0
-RabbitMQ server: 3.8.15
-RabbitMQ client jar: 5.12.0
+
+| | Version |
+|----------|---------|
+| IoTDB | 2.0.5 |
+| RabbitMQ | 5.26.0 |
### Dependencies with Maven
@@ -35,29 +37,106 @@ RabbitMQ client jar: 5.12.0
org.apache.iotdb
- iotdb-jdbc
- ${project.version}
+ iotdb-session
+ 2.0.5
com.rabbitmq
amqp-client
- 5.12.0
+ 5.26.0
```
-### 1. Install IoTDB
+### Prerequisite Steps
+
+#### 1. Install IoTDB
please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download)
-### 2. Install RabbitMQ
+#### 2. Install RabbitMQ
please refer to [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html)
-### 3. Startup IoTDB
-please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html)
+#### 3. Startup IoTDB
+please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html)
-### 4. Startup RocketMQ
+#### 4. Startup RocketMQ
please refer to [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html)
-### 5. Start the consumer client: RabbitMQConsumer
+### Case 1: Send data from localhost to IoTDB-tree
+
+Files related:
+1. `Constant.java` : configuration of IoTDB and RabbitMQ
+2. `RabbitMQProducer.java` : send data from localhost to RabbitMQ
+3. `RabbitMQConsumer.java` : consume data from RabbitMQ
+4. `RabbitMQChannelUtils.java` : common functions
+
+Step 0: Set parameter in `Constant.java`
+
+> Change the parameters according to your situation.
+
+| Parameter | Data Type | Description |
+|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| TOPIC | String | The topic to store data in Kafka |
+| SERVER_HOST | String | The server ip of RabbitMQ, e.g. `"localhost"` |
+| SERVER_PORT | int | The server port of RabbitMQ, e.g. `5672` |
+| RABBITMQ_VHOST | String | The virtual host in RabbitMQ, e.g. `"/"` |
+| RABBITMQ_USERNAME | String | RabbitMQ username, e.g. `"guest"` |
+| RABBITMQ_PASSWORD | String | RabbitMQ password, e.g. `"guest"` |
+| CONNECTION_NAME | String | RabbitMQ connection name |
+| RABBITMQ_CONSUMER_QUEUE | String | The consumer queue name in RabbitMQ |
+| RABBITMQ_CONSUMER_TAG | String | The consumer tag in RabbitMQ |
+| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` |
+| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` |
+| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` |
+| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` |
+| STORAGE_GROUP | Array | The storage groups to create |
+| TIMESERIESLIST | Array | The timeseries to create
Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"}
e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` |
+| ALL_DATA | Array | The data to create
Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*"
e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` |
+
+
+Step 1: Run `RabbitMQProducer.java`
+
+> This class sends data from localhost to RabbitMQ.
+
+Step 2: Run `RabbitMQConsumer.java`
+
+> This class consumes data from RabbitMQ and sends the data to IoTDB-tree.
+
+### Case 2: Send data from localhost to IoTDB-table
+
+Files related:
+1. `RelationalConstant.java` : configuration of IoTDB and RabbitMQ
+2. `RelationalRabbitMQProducer.java` : send data from localhost to RabbitMQ
+3. `RelationalRabbitMQConsumer.java` : consume data from RabbitMQ
+4. `RabbitMQChannelUtils.java` : common functions
+
+Step 0: Set parameter in `RelationalConstant.java`
+
+> Change the parameters according to your situation.
+
+| Parameter | Data Type | Description |
+|-------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| TOPIC | String | The topic to store data in Kafka |
+| SERVER_HOST | String | The server ip of RabbitMQ, e.g. `"localhost"` |
+| SERVER_PORT | int | The server port of RabbitMQ, e.g. `5672` |
+| RABBITMQ_VHOST | String | The virtual host in RabbitMQ, e.g. `"/"` |
+| RABBITMQ_USERNAME | String | RabbitMQ username, e.g. `"guest"` |
+| RABBITMQ_PASSWORD | String | RabbitMQ password, e.g. `"guest"` |
+| CONNECTION_NAME | String | RabbitMQ connection name |
+| RABBITMQ_CONSUMER_QUEUE | String | The consumer queue name in RabbitMQ |
+| RABBITMQ_CONSUMER_TAG | String | The consumer tag in RabbitMQ |
+| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` |
+| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` |
+| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` |
+| DATABASES | Array | The databases to create |
+| TABLES | Array | The tables to create
Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"}
e.g. `{"rabbitmq_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` |
+| ALL_DATA | Array | The data to create
Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*"
e.g. `"rabbitmq_db1;tb1;time,status;17,true;18,false;19,true"` |
+
+Step 1: Run `RabbitMQProducer.java`
+
+> This class sends data from localhost to RabbitMQ.
+
+Step 2: Run `RabbitMQConsumer.java`
+
+> This class consumes data from RabbitMQ and sends the data to IoTDB-table.
-### 6. Start the producer client: RabbitMQProducer
\ No newline at end of file
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java
index 54df3233..fdc9bf86 100644
--- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java
@@ -22,6 +22,11 @@
public class Constant {
private Constant() {}
+ public static final String SERVER_HOST = "localhost";
+ public static final int SERVER_PORT = 5672;
+ public static final String RABBITMQ_VHOST = "/";
+ public static final String RABBITMQ_USERNAME = "guest";
+ public static final String RABBITMQ_PASSWORD = "guest";
public static final String CONNECTION_NAME = "RabbitMQ-Connection";
public static final String RABBITMQ_CONSUMER_QUEUE = "IoTDB_Topic_Queue";
public static final String RABBITMQ_CONSUMER_TAG = "IoTDB_CONSUMER_TAG";
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
index b2ff48c8..bc4c8b95 100644
--- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.rabbitmq;
-import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import org.apache.iotdb.rabbitmq.relational.RelationalConstant;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@@ -30,23 +30,29 @@ public class RabbitMQChannelUtils {
private RabbitMQChannelUtils() {}
- @SuppressWarnings("squid:S2095")
- public static Channel getChannelInstance(String connectionDescription)
+ public static Connection getConnection()
throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = getConnectionFactory();
- Connection connection = connectionFactory.newConnection(connectionDescription);
- return connection.createChannel();
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setHost(Constant.SERVER_HOST);
+ connectionFactory.setPort(Constant.SERVER_PORT);
+ connectionFactory.setVirtualHost(Constant.RABBITMQ_VHOST);
+ connectionFactory.setUsername(Constant.RABBITMQ_USERNAME);
+ connectionFactory.setPassword(Constant.RABBITMQ_PASSWORD);
+ connectionFactory.setAutomaticRecoveryEnabled(true);
+ connectionFactory.setNetworkRecoveryInterval(10000);
+ return connectionFactory.newConnection(Constant.CONNECTION_NAME);
}
- private static ConnectionFactory getConnectionFactory() {
+ public static Connection getRelationalConnection()
+ throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
+ connectionFactory.setHost(RelationalConstant.SERVER_HOST);
+ connectionFactory.setPort(RelationalConstant.SERVER_PORT);
+ connectionFactory.setVirtualHost(RelationalConstant.RABBITMQ_VHOST);
+ connectionFactory.setUsername(RelationalConstant.RABBITMQ_USERNAME);
+ connectionFactory.setPassword(RelationalConstant.RABBITMQ_PASSWORD);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(10000);
- return connectionFactory;
+ return connectionFactory.newConnection(RelationalConstant.CONNECTION_NAME);
}
}
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
index f8f93c30..273fa8a7 100644
--- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
@@ -99,6 +99,11 @@ private static void createTimeseries(Session session, String[] timeseries)
private void insert(Session session, String data)
throws IoTDBConnectionException, StatementExecutionException {
+ try {
+ session.open();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
index 0d383966..41b66cc0 100644
--- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
@@ -35,15 +35,26 @@ public class RabbitMQProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducer.class);
public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMQChannelUtils.getChannelInstance(Constant.CONNECTION_NAME);
+ Connection connection = RabbitMQChannelUtils.getConnection();
+ Channel channel = connection.createChannel();
channel.exchangeDeclare(Constant.TOPIC, BuiltinExchangeType.TOPIC);
+ channel.confirmSelect();
AMQP.BasicProperties basicProperties =
new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
for (int i = 0; i < Constant.ALL_DATA.length; i++) {
String key = String.format("%s.%s", "IoTDB", Objects.toString(i));
+ channel.queueDeclare(key, true, false, false, null);
channel.basicPublish(
Constant.TOPIC, key, false, basicProperties, Constant.ALL_DATA[i].getBytes());
- LOGGER.info(Constant.ALL_DATA[i]);
+ try {
+ if (channel.waitForConfirms()) {
+ LOGGER.info(" [x] Sent : {}", Constant.ALL_DATA[i]);
+ } else {
+ LOGGER.error(" [x] Timed out waiting for confirmation");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(" [x] Interrupted while waiting for confirmation");
+ }
}
}
}
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java
new file mode 100644
index 00000000..e3ce8f4d
--- /dev/null
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rabbitmq.relational;
+
+public class RelationalConstant {
+
+ public static final String SERVER_HOST = "localhost";
+ public static final int SERVER_PORT = 5672;
+ public static final String RABBITMQ_VHOST = "/";
+ public static final String RABBITMQ_USERNAME = "guest";
+ public static final String RABBITMQ_PASSWORD = "guest";
+ public static final String CONNECTION_NAME = "RabbitMQ-Relational-Connection";
+ public static final String RABBITMQ_CONSUMER_QUEUE = "IoTDB_Relational_Topic_Queue";
+ public static final String RABBITMQ_CONSUMER_TAG = "IoTDB_Relational_CONSUMER_TAG";
+ public static final String TOPIC = "RabbitMQ-Relational-Test";
+ public static final String[] IOTDB_URLS = {
+ "127.0.0.1:6667"
+ };
+ public static final String IOTDB_USERNAME = "root";
+ public static final String IOTDB_PASSWORD = "root";
+ public static final String[] DATABASES = {"rabbitmq_db1", "rabbitmq_db2"};
+ public static final String[][] TABLES = {
+ // database, tableName, columnNames, columnTypes, columnCategories
+ {"rabbitmq_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
+ {"rabbitmq_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
+ };
+ public static final String[] ALL_DATA = {
+ // database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
+ "rabbitmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
+ "rabbitmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
+ };
+}
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java
new file mode 100644
index 00000000..5ae0705d
--- /dev/null
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rabbitmq.relational;
+
+import com.rabbitmq.client.*;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
+import org.example.RabbitMQChannelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.TimeoutException;
+
+public class RelationalRabbitMQConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRabbitMQConsumer.class);
+ private static ITableSession tableSession;
+
+ public RelationalRabbitMQConsumer() throws IoTDBConnectionException {
+ initIoTDB();
+ }
+
+ public static void main(String[] args) throws IOException, TimeoutException, IoTDBConnectionException {
+ RelationalRabbitMQConsumer consumer = new RelationalRabbitMQConsumer();
+ Connection connection = RabbitMQChannelUtils.getRelationalConnection();
+ Channel channel = connection.createChannel();
+ AMQP.Queue.DeclareOk declareOk =
+ channel.queueDeclare(
+ RelationalConstant.RABBITMQ_CONSUMER_QUEUE, true, false, false, new HashMap<>());
+ channel.exchangeDeclare(RelationalConstant.TOPIC, BuiltinExchangeType.TOPIC);
+ channel.queueBind(declareOk.getQueue(), RelationalConstant.TOPIC, "IoTDB.#", new HashMap<>());
+ DefaultConsumer defaultConsumer =
+ new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(
+ String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+ String param =
+ consumerTag + ", " + envelope.toString() + ", " + properties.toString();
+ LOGGER.info(param);
+ try {
+ consumer.insert(new String(body));
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ };
+ channel.basicConsume(
+ declareOk.getQueue(), true, RelationalConstant.RABBITMQ_CONSUMER_TAG, defaultConsumer);
+ }
+
+ private void initIoTDB() throws IoTDBConnectionException {
+ tableSession =
+ new TableSessionBuilder()
+ .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
+ .username(RelationalConstant.IOTDB_USERNAME)
+ .password(RelationalConstant.IOTDB_PASSWORD)
+ .build();
+ for (String db : RelationalConstant.DATABASES) {
+ boolean res = createDatabase(db);
+ if (!res) {
+ throw new RuntimeException("Create database failed");
+ }
+ }
+ for (String[] tableInfo : RelationalConstant.TABLES) {
+ boolean res = createTable(tableInfo);
+ if (!res) {
+ throw new RuntimeException("Create table failed");
+ }
+ }
+ }
+
+ private boolean createDatabase(String dbName) {
+ try {
+ tableSession.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Database Error: ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean createTable(String[] tableInfo) {
+ String sql = getCreateTableSQL(tableInfo);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Table Error: ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private static String getCreateTableSQL(String[] tableInfo) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");
+
+ String[] columnNames = tableInfo[2].split(",");
+ String[] columnTypes = tableInfo[3].split(",");
+ String[] columnCategories = tableInfo[4].split(",");
+ int columnSize = columnNames.length;
+
+ for (int i = 0; i < columnSize; i++) {
+ sql.append(columnNames[i]).append(" ");
+ sql.append(columnTypes[i]).append(" ");
+ sql.append(columnCategories[i]).append(",");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(")");
+ return sql.toString();
+ }
+
+ private void insert(String data) {
+ String sql = getInsertValueSQL(data);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ LOGGER.info("Insert Success: {}", sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Insert Error: ", e);
+ }
+ }
+
+ private String getInsertValueSQL(String s) {
+ StringBuilder sql = new StringBuilder();
+ String[] curDataInfo = s.split(";");
+ int valueSetSize = curDataInfo.length - 3;
+ String database = curDataInfo[0];
+ String tableName = curDataInfo[1];
+ String columnNames = curDataInfo[2];
+ sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"(");
+ sql.append(columnNames).append(") VALUES ");
+
+ for (int j = 0; j < valueSetSize; j++) {
+ String columnValues = curDataInfo[3 + j];
+ sql.append("(");
+ sql.append(columnValues);
+ sql.append("),");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ return sql.toString();
+ }
+}
diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java
new file mode 100644
index 00000000..95f0581f
--- /dev/null
+++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rabbitmq.relational;
+
+import com.rabbitmq.client.*;
+import org.example.RabbitMQChannelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class RelationalRabbitMQProducer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRabbitMQProducer.class);
+
+ public static void main(String[] args) {
+ try (Connection connection = RabbitMQChannelUtils.getRelationalConnection()) {
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(RelationalConstant.TOPIC, BuiltinExchangeType.TOPIC);
+ channel.confirmSelect();
+ AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
+ for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
+ String key = String.format("%s.%s", "IoTDB", Objects.toString(i));
+ channel.queueDeclare(key, true, false, false, null);
+ channel.basicPublish(RelationalConstant.TOPIC, key, false, basicProperties, RelationalConstant.ALL_DATA[i].getBytes());
+ try {
+ if (channel.waitForConfirms()) {
+ LOGGER.info(" [x] Sent : {}", RelationalConstant.ALL_DATA[i]);
+ } else {
+ LOGGER.error(" [x] Timed out waiting for confirmation");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(" [x] Interrupted while waiting for confirmation");
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+}
diff --git a/examples/rocketmq/pom.xml b/examples/rocketmq/pom.xml
index b880681c..fc7d25fc 100644
--- a/examples/rocketmq/pom.xml
+++ b/examples/rocketmq/pom.xml
@@ -37,6 +37,7 @@
org.apache.rocketmq
rocketmq-client
+ 5.3.3
com.alibaba
diff --git a/examples/rocketmq/readme.md b/examples/rocketmq/readme.md
index 4c95c6f3..b9a8bb8e 100644
--- a/examples/rocketmq/readme.md
+++ b/examples/rocketmq/readme.md
@@ -46,38 +46,108 @@ Producers insert IoTDB insert statements into partitions according to devices, e
## Usage
### Version usage
-IoTDB: 1.0.0
-RocketMQ: 4.4.0
+
+| | Version |
+|----------|---------|
+| IoTDB | 2.0.5 |
+| RocketMQ | 5.3.3 |
+
### Dependencies with Maven
```
- org.apache.iotdb
- iotdb-session
- 1.0.0
+ org.apache.iotdb
+ iotdb-session
+ 2.0.5
- org.apache.rocketmq
- rocketmq-client
- 4.4.0
+ org.apache.rocketmq
+ rocketmq-client
+ 5.3.3
```
Note: The maven dependencies of io.netty in IoTDB are in conflicts with those dependencies in RocketMQ-Client.
-### 1. Install IoTDB
+### Prerequisite Steps
+
+#### 1. Install IoTDB
please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download)
-### 2. Install RocketMQ
+#### 2. Install RocketMQ
please refer to [http://rocketmq.apache.org/docs/quick-start/](http://rocketmq.apache.org/docs/quick-start/)
-### 3. Startup IoTDB
-please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html)
+#### 3. Startup IoTDB
+please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html)
-### 4. Startup RocketMQ
+#### 4. Startup RocketMQ
please refer to [http://rocketmq.apache.org/docs/quick-start/](http://rocketmq.apache.org/docs/quick-start/)
-### 5. Start the consumer client:RocketMQConsumer
+### Case 1: Send data from localhost to IoTDB-tree
+
+Files related:
+1. `Constant.java` : configuration of IoTDB and RocketMQ
+2. `RocketMQProducer.java` : send data from localhost to RocketMQ
+3. `RocketMQConsumer.java` : consume data from RocketMQ
+4. `Utils.java` : utils
+
+Step 0: Set parameter in `Constant.java`
+
+> Change the parameters according to your situation.
+
+| Parameter | Data Type | Description |
+|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| TOPIC | String | The topic to store data in RocketMQ |
+| SERVER_ADDRESS | String | The server address of RocketMQ, e.g. `"127.0.0.1:9876"` |
+| PRODUCER_GROUP | String | The producer group name in RocketMQ |
+| CONSUMER_GROUP | String | The consumer group name in RocketMQ |
+| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` |
+| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` |
+| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` |
+| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` |
+| STORAGE_GROUP | Array | The storage groups to create |
+| CREATE_TIMESERIES | Array | The timeseries to create
Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"}
e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` |
+| ALL_DATA | Array | The data to create
Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*"
e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` |
+
+Step 1: Run `RocketMQProducer.java`
+
+> This class sends data from localhost to RocketMQ.
+
+Step 2: Run `RocketMQConsumer.java`
+
+> This class consumes data from RocketMQ and sends the data to IoTDB-tree.
+
+### Case 2: Send data from localhost to IoTDB-table
+
+Files related:
+1. `RelationalConstant.java` : configuration of IoTDB and RocketMQ
+2. `RelationalRocketMQProducer.java` : send data from localhost to RocketMQ
+3. `RelationalRocketMQConsumer.java` : consume data from RocketMQ
+4. `RelationalUtils.java` : utils
+
+Step 0: Set parameter in `RelationalConstant.java`
+
+> Change the parameters according to your situation.
+
+| Parameter | Data Type | Description |
+|----------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| TOPIC | String | The topic to store data in RocketMQ |
+| SERVER_ADDRESS | String | The server address of RocketMQ, e.g. `"127.0.0.1:9876"` |
+| PRODUCER_GROUP | String | The producer group name in RocketMQ |
+| CONSUMER_GROUP | String | The consumer group name in RocketMQ |
+| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` |
+| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` |
+| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` |
+| DATABASES | Array | The databases to create |
+| TABLES | Array | The tables to create
Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"}
e.g. `{"rocketmq_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` |
+| ALL_DATA | Array | The data to create
Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*"
e.g. `"rocketmq_db1;tb1;time,status;17,true;18,false;19,true"` |
+
+
+Step 1: Run `RelationalRocketMQProducer.java`
+
+> This class sends data from localhost to RocketMQ.
+
+Step 2: Run `RelationalRocketMQConsumer.java`
-### 6. Start the producer client:RocketMQProducer
+> This class consumes data from RocketMQ and sends the data to IoTDB-table.
diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java
new file mode 100644
index 00000000..e492fba5
--- /dev/null
+++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rocketmq.relational;
+
+public class RelationalConstant {
+
+ public static final String SERVER_ADDRESS = "localhost:9876";
+ public static final String PRODUCER_GROUP = "IoTDBRelationalConsumer";
+ public static final String CONSUMER_GROUP = "IoTDBRelationalProducer";
+ public static final String TOPIC = "RocketMQ-Relational-Test";
+ public static final String[] IOTDB_URLS = {
+ "127.0.0.1:6667"
+ };
+ public static final String IOTDB_USERNAME = "root";
+ public static final String IOTDB_PASSWORD = "root";
+ public static final String[] DATABASES = {"rocketmq_db1", "rocketmq_db2"};
+ public static final String[][] TABLES = {
+ // database, tableName, columnNames, columnTypes, columnCategories
+ {"rocketmq_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
+ {"rocketmq_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
+ };
+ public static final String[] ALL_DATA = {
+ // database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
+ "rocketmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
+ "rocketmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
+ };
+}
diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java
new file mode 100644
index 00000000..acbae725
--- /dev/null
+++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rocketmq.relational;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class RelationalRocketMQConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRocketMQConsumer.class);
+ private ITableSession tableSession;
+ private DefaultMQPushConsumer consumer;
+ private String producerGroup;
+ private String serverAddresses;
+
+ public RelationalRocketMQConsumer(
+ String producerGroup,
+ String serverAddresses) throws IoTDBConnectionException {
+ this.producerGroup = producerGroup;
+ this.serverAddresses = serverAddresses;
+ this.consumer = new DefaultMQPushConsumer(producerGroup);
+ this.consumer.setNamesrvAddr(serverAddresses);
+ initIoTDB();
+ }
+
+ private void initIoTDB() throws IoTDBConnectionException {
+ tableSession =
+ new TableSessionBuilder()
+ .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
+ .username(RelationalConstant.IOTDB_USERNAME)
+ .password(RelationalConstant.IOTDB_PASSWORD)
+ .build();
+ for (String db : RelationalConstant.DATABASES) {
+ boolean res = createDatabase(db);
+ if (!res) {
+ throw new RuntimeException("Create database failed");
+ }
+ }
+ for (String[] tableInfo : RelationalConstant.TABLES) {
+ boolean res = createTable(tableInfo);
+ if (!res) {
+ throw new RuntimeException("Create table failed");
+ }
+ }
+ }
+
+ private boolean createDatabase(String dbName) {
+ try {
+ tableSession.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Database Error: ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean createTable(String[] tableInfo) {
+ String sql = getCreateTableSQL(tableInfo);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Table Error: ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private static String getCreateTableSQL(String[] tableInfo) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");
+
+ String[] columnNames = tableInfo[2].split(",");
+ String[] columnTypes = tableInfo[3].split(",");
+ String[] columnCategories = tableInfo[4].split(",");
+ int columnSize = columnNames.length;
+
+ for (int i = 0; i < columnSize; i++) {
+ sql.append(columnNames[i]).append(" ");
+ sql.append(columnTypes[i]).append(" ");
+ sql.append(columnCategories[i]).append(",");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(")");
+ return sql.toString();
+ }
+
+ public static void main(String[] args)
+ throws MQClientException, IoTDBConnectionException {
+ RelationalRocketMQConsumer consumer =
+ new RelationalRocketMQConsumer(RelationalConstant.CONSUMER_GROUP, RelationalConstant.SERVER_ADDRESS);
+ consumer.prepareConsume();
+ consumer.start();
+ }
+
+ public void prepareConsume() throws MQClientException {
+ consumer.subscribe(RelationalConstant.TOPIC, "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.registerMessageListener(
+ (MessageListenerOrderly)
+ (messages, context) -> {
+ for (MessageExt msg : messages) {
+ LOGGER.info(
+ String.format(
+ "%s Receive New Messages: %s %n",
+ Thread.currentThread().getName(), new String(msg.getBody())));
+ try {
+ insert(new String(msg.getBody()));
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ return ConsumeOrderlyStatus.SUCCESS;
+ });
+ }
+
+ private void insert(String data) {
+ String sql = getInsertValueSQL(data);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Insert Values Into Table Error: ", e);
+ }
+ }
+
+ private String getInsertValueSQL(String s) {
+ StringBuilder sql = new StringBuilder();
+ String[] curDataInfo = s.split(";");
+ int valueSetSize = curDataInfo.length - 3;
+ String database = curDataInfo[0];
+ String tableName = curDataInfo[1];
+ String columnNames = curDataInfo[2];
+ sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"(");
+ sql.append(columnNames).append(") VALUES ");
+
+ for (int j = 0; j < valueSetSize; j++) {
+ String columnValues = curDataInfo[3 + j];
+ sql.append("(");
+ sql.append(columnValues);
+ sql.append("),");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ return sql.toString();
+ }
+
+ public void start() throws MQClientException {
+ consumer.start();
+ }
+}
diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java
new file mode 100644
index 00000000..f96b5bd3
--- /dev/null
+++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rocketmq.relational;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+
+public class RelationalRocketMQProducer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRocketMQProducer.class);
+ private DefaultMQProducer producer;
+ private String producerGroup;
+ private String serverAddresses;
+
+ public RelationalRocketMQProducer(String producerGroup, String serverAddresses) {
+ this.producerGroup = producerGroup;
+ this.serverAddresses = serverAddresses;
+ producer = new DefaultMQProducer(producerGroup);
+ producer.setNamesrvAddr(serverAddresses);
+ }
+
+ public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
+ RelationalRocketMQProducer producer = new RelationalRocketMQProducer(RelationalConstant.PRODUCER_GROUP, RelationalConstant.SERVER_ADDRESS);
+ producer.start();
+ producer.sendMessage();
+ producer.shutdown();
+ }
+
+ public void start() throws MQClientException {
+ producer.start();
+ }
+
+ public void sendMessage() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ for (String data : RelationalConstant.ALL_DATA) {
+ Message msg = new Message(RelationalConstant.TOPIC, null, null, (data).getBytes(StandardCharsets.UTF_8));
+ SendResult sendResult =
+ producer.send(
+ msg,
+ (mqs, msg1, arg) -> {
+ Integer id = (Integer) arg;
+ int index = id % mqs.size();
+ return mqs.get(index);
+ },
+ RelationalUtils.convertStringToInteger(RelationalUtils.getDatabaseNTable(data)));
+ String result = sendResult.toString();
+ LOGGER.info(result);
+ }
+ }
+
+ public void shutdown() {
+ producer.shutdown();
+ }
+}
diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java
new file mode 100644
index 00000000..2bbd3c05
--- /dev/null
+++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rocketmq.relational;
+
+public class RelationalUtils {
+
+ private RelationalUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static int convertStringToInteger(String fullTable) {
+ int sum = 0;
+ for (char c : fullTable.toCharArray()) {
+ sum += c;
+ }
+ return sum;
+ }
+
+ public static String getDatabaseNTable(String data) {
+ String[] info = data.split(";");
+ return info[0] + "." + info[1];
+ }
+}
diff --git a/pom.xml b/pom.xml
index 0bcd47a9..9ce4d43b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@
https://github.com/apache/iotdb-bin-resources/tree/main/iotdb-tools-thrift
-->
0.14.1.0
- 2.0.3
+ 2.0.5
2.16.2
4.0.4