From c3ac1f4f57a837c2fe0750690512cc72be46cac3 Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Mon, 15 Sep 2025 16:25:20 +0800 Subject: [PATCH 1/8] Added Kafka examples for IoTDB-table --- .../kafka/relational/RelationalConstant.java | 30 ++++ .../kafka/relational/RelationalConsumer.java | 130 ++++++++++++++++++ .../relational/RelationalConsumerThread.java | 81 +++++++++++ .../kafka/relational/RelationalProducer.java | 47 +++++++ 4 files changed, 288 insertions(+) create mode 100644 examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java create mode 100644 examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java create mode 100644 examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java create mode 100644 examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java new file mode 100644 index 00000000..10ff5a96 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java @@ -0,0 +1,30 @@ +package org.example.relational; + +public class RelationalConstant { + + public static final String KAFKA_SERVICE_URL = "172.20.31.71:9094"; + public static final String TOPIC = "Kafka-Relational-Test"; + public static final String[] IOTDB_URLS = { + "127.0.0.1:6667" + }; + public static final String IOTDB_USERNAME = "root"; + public static final String IOTDB_PASSWORD = "root"; + public static final int SESSION_SIZE = 3; + public static final int CONSUMER_THREAD_NUM = 5; + public static final String[] DATABASES = {"kafka_db1", "kafka_db2"}; + public static final String[][] TABLES = { + // database, tableName, columnNames, columnTypes, columnCategories + {"kafka_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"}, + {"kafka_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"} + }; + public static final String[] ALL_DATA = { + // database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]* + "kafka_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true", + "kafka_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31", + "kafka_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true", + "kafka_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false", + "kafka_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true", + "kafka_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true", + "kafka_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true" + }; +} diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java new file mode 100644 index 00000000..8c1df136 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java @@ -0,0 +1,130 @@ +package org.example.relational; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.pool.ITableSessionPool; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.TableSessionPoolBuilder; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class RelationalConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalConsumer.class); + private static ITableSessionPool tableSessionPool; + private List> consumerList; + + private RelationalConsumer(List> consumerList) { + this.consumerList = consumerList; + initSessionPool(); + } + + private static void initSessionPool() { + tableSessionPool = + new TableSessionPoolBuilder() + .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS)) + .user(RelationalConstant.IOTDB_USERNAME) + .password(RelationalConstant.IOTDB_PASSWORD) + .maxSize(RelationalConstant.SESSION_SIZE) + .build(); + } + + public static void main(String[] args) { + List> consumerList = new ArrayList<>(); + for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumerList.add(consumer); + consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC)); + } + RelationalConsumer consumer = new RelationalConsumer(consumerList); + initIoTDB(); + consumer.consumeInParallel(); + } + + private static void initIoTDB() { + for (String db : RelationalConstant.DATABASES) { + boolean res = createDatabase(db); + if (!res) { + throw new RuntimeException("Create database failed"); + } + } + for (String[] tableInfo : RelationalConstant.TABLES) { + boolean res = createTable(tableInfo); + if (!res) { + throw new RuntimeException("Create table failed"); + } + } + } + + private static boolean createDatabase(String dbName) { + try (ITableSession session = tableSessionPool.getSession()) { + try { + session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName)); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Create Database Error: ", e); + return false; + } + } catch (IoTDBConnectionException e) { + LOGGER.error("Get Table Session Error: ", e); + return false; + } + return true; + } + + private static boolean createTable(String[] tableInfo) { + try (ITableSession session = tableSessionPool.getSession()) { + String sql = getCreateTableSQL(tableInfo); + try { + session.executeNonQueryStatement(sql); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Create Table Error: ", e); + return false; + } + } catch (IoTDBConnectionException e) { + LOGGER.error("Get Table Session Error: ", e); + return false; + } + return true; + } + + private static String getCreateTableSQL(String[] tableInfo) { + StringBuilder sql = new StringBuilder(); + sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" ("); + + String[] columnNames = tableInfo[2].split(","); + String[] columnTypes = tableInfo[3].split(","); + String[] columnCategories = tableInfo[4].split(","); + int columnSize = columnNames.length; + + for (int i = 0; i < columnSize; i++) { + sql.append(columnNames[i]).append(" "); + sql.append(columnTypes[i]).append(" "); + sql.append(columnCategories[i]).append(","); + } + sql.deleteCharAt(sql.length() - 1); + sql.append(")"); + return sql.toString(); + } + + private void consumeInParallel() { + ExecutorService executor = Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM); + for (int i = 0; i < consumerList.size(); i++) { + RelationalConsumerThread consumerThread = new RelationalConsumerThread(consumerList.get(i), tableSessionPool); + executor.submit(consumerThread); + } + } +} diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java new file mode 100644 index 00000000..e514d494 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java @@ -0,0 +1,81 @@ +package org.example.relational; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.pool.ITableSessionPool; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.example.ConsumerThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +public class RelationalConsumerThread implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerThread.class); + private KafkaConsumer consumer; + private ITableSessionPool tableSessionPool; + + public RelationalConsumerThread(KafkaConsumer consumer, ITableSessionPool tableSessionPool) { + this.consumer = consumer; + this.tableSessionPool = tableSessionPool; + } + + @Override + public void run() { + try { + do { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + LOGGER.info("Received records: {}", records.count()); + List dataList = new ArrayList<>(records.count()); + for (ConsumerRecord consumerRecord : records) { + dataList.add(consumerRecord.value()); + } + insertDataList(dataList); + } while (true); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + private void insertDataList(List dataList) { + for (String s : dataList) { + String sql = getInsertValueSQL(s); + + try (ITableSession session = tableSessionPool.getSession()) { + try { + session.executeNonQueryStatement(sql); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Insert Values Into Table Error: ", e); + } + } catch (IoTDBConnectionException e) { + LOGGER.error("Get Table Session Error: ", e); + } + } + } + + private String getInsertValueSQL(String s) { + StringBuilder sql = new StringBuilder(); + String[] curDataInfo = s.split(";"); + int valueSetSize = curDataInfo.length - 3; + String database = curDataInfo[0]; + String tableName = curDataInfo[1]; + String columnNames = curDataInfo[2]; + sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"("); + sql.append(columnNames).append(") VALUES "); + + for (int j = 0; j < valueSetSize; j++) { + String columnValues = curDataInfo[3 + j]; + sql.append("("); + sql.append(columnValues); + sql.append("),"); + } + sql.deleteCharAt(sql.length() - 1); + return sql.toString(); + } +} diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java new file mode 100644 index 00000000..52b59734 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java @@ -0,0 +1,47 @@ +package org.example.relational; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class RelationalProducer { + + private final KafkaProducer kafkaProducer; + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalProducer.class); + + public RelationalProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + kafkaProducer = new KafkaProducer<>(props); + } + + public static void main(String[] args) { + RelationalProducer relationalProducer = new RelationalProducer(); + relationalProducer.produce(); + relationalProducer.close(); + } + + private void produce() { + for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) { + String key = Integer.toString(i); + try { + RecordMetadata metadata = kafkaProducer.send(new ProducerRecord<>(RelationalConstant.TOPIC, key, RelationalConstant.ALL_DATA[i])).get(); + LOGGER.info("Sent record(key={} value={}) meta(partition={}, offset={})\n", key, RelationalConstant.ALL_DATA[i], metadata.partition(), metadata.offset()); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } + + private void close() { + kafkaProducer.close(); + } +} From 223cb32a0a144e8c61eeac4fa25549c13685a71c Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Tue, 16 Sep 2025 09:37:02 +0800 Subject: [PATCH 2/8] Added license header --- .../kafka/relational/RelationalConstant.java | 19 +++++++++++++++++++ .../kafka/relational/RelationalConsumer.java | 19 +++++++++++++++++++ .../relational/RelationalConsumerThread.java | 19 +++++++++++++++++++ .../kafka/relational/RelationalProducer.java | 19 +++++++++++++++++++ 4 files changed, 76 insertions(+) diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java index 10ff5a96..fb752ce8 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.example.relational; public class RelationalConstant { diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java index 8c1df136..6ae9f752 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.example.relational; import org.apache.iotdb.isession.ITableSession; diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java index e514d494..ee344c1d 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.example.relational; import org.apache.iotdb.isession.ITableSession; diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java index 52b59734..3e871946 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.example.relational; import org.apache.kafka.clients.producer.KafkaProducer; From 6029c171b438cb4ccb3bbec83782e4ff8de51952 Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Tue, 16 Sep 2025 14:22:24 +0800 Subject: [PATCH 3/8] Added RocketMQ examples for IoTDB-table --- .../relational/RelationalConstant.java | 49 +++++ .../RelationalRocketMQConsumer.java | 177 ++++++++++++++++++ .../RelationalRocketMQProducer.java | 78 ++++++++ .../rocketmq/relational/RelationalUtils.java | 40 ++++ 4 files changed, 344 insertions(+) create mode 100644 examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java create mode 100644 examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java create mode 100644 examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java create mode 100644 examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java new file mode 100644 index 00000000..a8f265fd --- /dev/null +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +public class RelationalConstant { + + public static final String SERVER_ADDRESS = "localhost:9876"; + public static final String PRODUCER_GROUP = "IoTDBRelationalConsumer"; + public static final String CONSUMER_GROUP = "IoTDBRelationalProducer"; + public static final String TOPIC = "RocketMQ-Relational-Test"; + public static final String[] IOTDB_URLS = { + "127.0.0.1:6667" + }; + public static final String IOTDB_USERNAME = "root"; + public static final String IOTDB_PASSWORD = "root"; + public static final String[] DATABASES = {"rocketmq_db1", "rocketmq_db2"}; + public static final String[][] TABLES = { + // database, tableName, columnNames, columnTypes, columnCategories + {"rocketmq_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"}, + {"rocketmq_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"} + }; + public static final String[] ALL_DATA = { + // database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]* + "rocketmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true", + "rocketmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31", + "rocketmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true", + "rocketmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false", + "rocketmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true", + "rocketmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true", + "rocketmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true" + }; +} diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java new file mode 100644 index 00000000..9a166b6b --- /dev/null +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.TableSessionBuilder; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +public class RelationalRocketMQConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRocketMQConsumer.class); + private ITableSession tableSession; + private DefaultMQPushConsumer consumer; + private String producerGroup; + private String serverAddresses; + + public RelationalRocketMQConsumer( + String producerGroup, + String serverAddresses) throws IoTDBConnectionException { + this.producerGroup = producerGroup; + this.serverAddresses = serverAddresses; + this.consumer = new DefaultMQPushConsumer(producerGroup); + this.consumer.setNamesrvAddr(serverAddresses); + initIoTDB(); + } + + private void initIoTDB() throws IoTDBConnectionException { + tableSession = + new TableSessionBuilder() + .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS)) + .username(RelationalConstant.IOTDB_USERNAME) + .password(RelationalConstant.IOTDB_PASSWORD) + .build(); + for (String db : RelationalConstant.DATABASES) { + boolean res = createDatabase(db); + if (!res) { + throw new RuntimeException("Create database failed"); + } + } + for (String[] tableInfo : RelationalConstant.TABLES) { + boolean res = createTable(tableInfo); + if (!res) { + throw new RuntimeException("Create table failed"); + } + } + } + + private boolean createDatabase(String dbName) { + try { + tableSession.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName)); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Create Database Error: ", e); + return false; + } + return true; + } + + private boolean createTable(String[] tableInfo) { + String sql = getCreateTableSQL(tableInfo); + try { + tableSession.executeNonQueryStatement(sql); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Create Table Error: ", e); + return false; + } + return true; + } + + private static String getCreateTableSQL(String[] tableInfo) { + StringBuilder sql = new StringBuilder(); + sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" ("); + + String[] columnNames = tableInfo[2].split(","); + String[] columnTypes = tableInfo[3].split(","); + String[] columnCategories = tableInfo[4].split(","); + int columnSize = columnNames.length; + + for (int i = 0; i < columnSize; i++) { + sql.append(columnNames[i]).append(" "); + sql.append(columnTypes[i]).append(" "); + sql.append(columnCategories[i]).append(","); + } + sql.deleteCharAt(sql.length() - 1); + sql.append(")"); + return sql.toString(); + } + + public static void main(String[] args) + throws MQClientException, IoTDBConnectionException { + RelationalRocketMQConsumer consumer = + new RelationalRocketMQConsumer(RelationalConstant.CONSUMER_GROUP, RelationalConstant.SERVER_ADDRESS); + consumer.prepareConsume(); + consumer.start(); + } + + public void prepareConsume() throws MQClientException { + consumer.subscribe(RelationalConstant.TOPIC, "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.registerMessageListener( + (MessageListenerOrderly) + (messages, context) -> { + for (MessageExt msg : messages) { + LOGGER.info( + String.format( + "%s Receive New Messages: %s %n", + Thread.currentThread().getName(), new String(msg.getBody()))); + try { + insert(new String(msg.getBody())); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + return ConsumeOrderlyStatus.SUCCESS; + }); + } + + private void insert(String data) { + String sql = getInsertValueSQL(data); + try { + tableSession.executeNonQueryStatement(sql); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Insert Values Into Table Error: ", e); + } + } + + private String getInsertValueSQL(String s) { + StringBuilder sql = new StringBuilder(); + String[] curDataInfo = s.split(";"); + int valueSetSize = curDataInfo.length - 3; + String database = curDataInfo[0]; + String tableName = curDataInfo[1]; + String columnNames = curDataInfo[2]; + sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"("); + sql.append(columnNames).append(") VALUES "); + + for (int j = 0; j < valueSetSize; j++) { + String columnValues = curDataInfo[3 + j]; + sql.append("("); + sql.append(columnValues); + sql.append("),"); + } + sql.deleteCharAt(sql.length() - 1); + return sql.toString(); + } + + public void start() throws MQClientException { + consumer.start(); + } +} diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java new file mode 100644 index 00000000..e59a4d72 --- /dev/null +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; + +public class RelationalRocketMQProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRocketMQProducer.class); + private DefaultMQProducer producer; + private String producerGroup; + private String serverAddresses; + + public RelationalRocketMQProducer(String producerGroup, String serverAddresses) { + this.producerGroup = producerGroup; + this.serverAddresses = serverAddresses; + producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(serverAddresses); + } + + public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + RelationalRocketMQProducer producer = new RelationalRocketMQProducer(RelationalConstant.PRODUCER_GROUP, RelationalConstant.SERVER_ADDRESS); + producer.start(); + producer.sendMessage(); + producer.shutdown(); + } + + public void start() throws MQClientException { + producer.start(); + } + + public void sendMessage() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + for (String data : RelationalConstant.ALL_DATA) { + Message msg = new Message(RelationalConstant.TOPIC, null, null, (data).getBytes(StandardCharsets.UTF_8)); + SendResult sendResult = + producer.send( + msg, + (mqs, msg1, arg) -> { + Integer id = (Integer) arg; + int index = id % mqs.size(); + return mqs.get(index); + }, + RelationalUtils.convertStringToInteger(RelationalUtils.getDatabaseNTable(data))); + String result = sendResult.toString(); + LOGGER.info(result); + } + } + + public void shutdown() { + producer.shutdown(); + } +} diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java new file mode 100644 index 00000000..3ef0e512 --- /dev/null +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +public class RelationalUtils { + + private RelationalUtils() { + throw new IllegalStateException("Utility class"); + } + + public static int convertStringToInteger(String fullTable) { + int sum = 0; + for (char c : fullTable.toCharArray()) { + sum += c; + } + return sum; + } + + public static String getDatabaseNTable(String data) { + String[] info = data.split(";"); + return info[0] + "." + info[1]; + } +} From 46fef94a6e86edc5cea030dd6698a4221165eb35 Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Wed, 17 Sep 2025 12:12:17 +0800 Subject: [PATCH 4/8] Added RabbitMQ examples for IoTDB-table + Fixed RabbitMQ examples for IoTDB-tree --- .../iotdb/rabbitmq/RabbitMQChannelUtils.java | 32 ++-- .../iotdb/rabbitmq/RabbitMQConsumer.java | 5 + .../iotdb/rabbitmq/RabbitMQProducer.java | 15 +- .../relational/RelationalConstant.java | 54 ++++++ .../RelationalRabbitMQConsumer.java | 166 ++++++++++++++++++ .../RelationalRabbitMQProducer.java | 57 ++++++ 6 files changed, 314 insertions(+), 15 deletions(-) create mode 100644 examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java create mode 100644 examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java create mode 100644 examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java index b2ff48c8..bc4c8b95 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java @@ -19,9 +19,9 @@ package org.apache.iotdb.rabbitmq; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import org.apache.iotdb.rabbitmq.relational.RelationalConstant; import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -30,23 +30,29 @@ public class RabbitMQChannelUtils { private RabbitMQChannelUtils() {} - @SuppressWarnings("squid:S2095") - public static Channel getChannelInstance(String connectionDescription) + public static Connection getConnection() throws IOException, TimeoutException { - ConnectionFactory connectionFactory = getConnectionFactory(); - Connection connection = connectionFactory.newConnection(connectionDescription); - return connection.createChannel(); + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(Constant.SERVER_HOST); + connectionFactory.setPort(Constant.SERVER_PORT); + connectionFactory.setVirtualHost(Constant.RABBITMQ_VHOST); + connectionFactory.setUsername(Constant.RABBITMQ_USERNAME); + connectionFactory.setPassword(Constant.RABBITMQ_PASSWORD); + connectionFactory.setAutomaticRecoveryEnabled(true); + connectionFactory.setNetworkRecoveryInterval(10000); + return connectionFactory.newConnection(Constant.CONNECTION_NAME); } - private static ConnectionFactory getConnectionFactory() { + public static Connection getRelationalConnection() + throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setHost("127.0.0.1"); - connectionFactory.setPort(5672); - connectionFactory.setVirtualHost("/"); - connectionFactory.setUsername("guest"); - connectionFactory.setPassword("guest"); + connectionFactory.setHost(RelationalConstant.SERVER_HOST); + connectionFactory.setPort(RelationalConstant.SERVER_PORT); + connectionFactory.setVirtualHost(RelationalConstant.RABBITMQ_VHOST); + connectionFactory.setUsername(RelationalConstant.RABBITMQ_USERNAME); + connectionFactory.setPassword(RelationalConstant.RABBITMQ_PASSWORD); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(10000); - return connectionFactory; + return connectionFactory.newConnection(RelationalConstant.CONNECTION_NAME); } } diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java index f8f93c30..273fa8a7 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java @@ -99,6 +99,11 @@ private static void createTimeseries(Session session, String[] timeseries) private void insert(Session session, String data) throws IoTDBConnectionException, StatementExecutionException { + try { + session.open(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } String[] dataArray = data.split(","); String device = dataArray[0]; long time = Long.parseLong(dataArray[1]); diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java index 0d383966..41b66cc0 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java @@ -35,15 +35,26 @@ public class RabbitMQProducer { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducer.class); public static void main(String[] args) throws IOException, TimeoutException { - Channel channel = RabbitMQChannelUtils.getChannelInstance(Constant.CONNECTION_NAME); + Connection connection = RabbitMQChannelUtils.getConnection(); + Channel channel = connection.createChannel(); channel.exchangeDeclare(Constant.TOPIC, BuiltinExchangeType.TOPIC); + channel.confirmSelect(); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build(); for (int i = 0; i < Constant.ALL_DATA.length; i++) { String key = String.format("%s.%s", "IoTDB", Objects.toString(i)); + channel.queueDeclare(key, true, false, false, null); channel.basicPublish( Constant.TOPIC, key, false, basicProperties, Constant.ALL_DATA[i].getBytes()); - LOGGER.info(Constant.ALL_DATA[i]); + try { + if (channel.waitForConfirms()) { + LOGGER.info(" [x] Sent : {}", Constant.ALL_DATA[i]); + } else { + LOGGER.error(" [x] Timed out waiting for confirmation"); + } + } catch (InterruptedException e) { + LOGGER.error(" [x] Interrupted while waiting for confirmation"); + } } } } diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java new file mode 100644 index 00000000..1fde0bd8 --- /dev/null +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +public class RelationalConstant { + + public static final String SERVER_HOST = "localhost"; + public static final int SERVER_PORT = 5672; + public static final String RABBITMQ_VHOST = "/"; + public static final String RABBITMQ_USERNAME = "guest"; + public static final String RABBITMQ_PASSWORD = "guest"; + public static final String CONNECTION_NAME = "RabbitMQ-Relational-Connection"; + public static final String RABBITMQ_CONSUMER_QUEUE = "IoTDB_Relational_Topic_Queue"; + public static final String RABBITMQ_CONSUMER_TAG = "IoTDB_Relational_CONSUMER_TAG"; + public static final String TOPIC = "RabbitMQ-Relational-Test"; + public static final String[] IOTDB_URLS = { + "127.0.0.1:6667" + }; + public static final String IOTDB_USERNAME = "root"; + public static final String IOTDB_PASSWORD = "root"; + public static final String[] DATABASES = {"rabbitmq_db1", "rabbitmq_db2"}; + public static final String[][] TABLES = { + // database, tableName, columnNames, columnTypes, columnCategories + {"rabbitmq_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"}, + {"rabbitmq_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"} + }; + public static final String[] ALL_DATA = { + // database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]* + "rabbitmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true", + "rabbitmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31", + "rabbitmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true", + "rabbitmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false", + "rabbitmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true", + "rabbitmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true", + "rabbitmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true" + }; +} diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java new file mode 100644 index 00000000..d4fae039 --- /dev/null +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +import com.rabbitmq.client.*; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.TableSessionBuilder; +import org.example.RabbitMQChannelUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.TimeoutException; + +public class RelationalRabbitMQConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRabbitMQConsumer.class); + private static ITableSession tableSession; + + public RelationalRabbitMQConsumer() throws IoTDBConnectionException { + initIoTDB(); + } + + public static void main(String[] args) throws IOException, TimeoutException, IoTDBConnectionException { + RelationalRabbitMQConsumer consumer = new RelationalRabbitMQConsumer(); + Connection connection = RabbitMQChannelUtils.getRelationalConnection(); + Channel channel = connection.createChannel(); + AMQP.Queue.DeclareOk declareOk = + channel.queueDeclare( + RelationalConstant.RABBITMQ_CONSUMER_QUEUE, true, false, false, new HashMap<>()); + channel.exchangeDeclare(RelationalConstant.TOPIC, BuiltinExchangeType.TOPIC); + channel.queueBind(declareOk.getQueue(), RelationalConstant.TOPIC, "IoTDB.#", new HashMap<>()); + DefaultConsumer defaultConsumer = + new DefaultConsumer(channel) { + @Override + public void handleDelivery( + String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) { + String param = + consumerTag + ", " + envelope.toString() + ", " + properties.toString(); + LOGGER.info(param); + try { + consumer.insert(new String(body)); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + }; + channel.basicConsume( + declareOk.getQueue(), true, RelationalConstant.RABBITMQ_CONSUMER_TAG, defaultConsumer); + } + + private void initIoTDB() throws IoTDBConnectionException { + tableSession = + new TableSessionBuilder() + .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS)) + .username(RelationalConstant.IOTDB_USERNAME) + .password(RelationalConstant.IOTDB_PASSWORD) + .build(); + for (String db : RelationalConstant.DATABASES) { + boolean res = createDatabase(db); + if (!res) { + throw new RuntimeException("Create database failed"); + } + } + for (String[] tableInfo : RelationalConstant.TABLES) { + boolean res = createTable(tableInfo); + if (!res) { + throw new RuntimeException("Create table failed"); + } + } + } + + private boolean createDatabase(String dbName) { + try { + tableSession.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName)); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Create Database Error: ", e); + return false; + } + return true; + } + + private boolean createTable(String[] tableInfo) { + String sql = getCreateTableSQL(tableInfo); + try { + tableSession.executeNonQueryStatement(sql); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Create Table Error: ", e); + return false; + } + return true; + } + + private static String getCreateTableSQL(String[] tableInfo) { + StringBuilder sql = new StringBuilder(); + sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" ("); + + String[] columnNames = tableInfo[2].split(","); + String[] columnTypes = tableInfo[3].split(","); + String[] columnCategories = tableInfo[4].split(","); + int columnSize = columnNames.length; + + for (int i = 0; i < columnSize; i++) { + sql.append(columnNames[i]).append(" "); + sql.append(columnTypes[i]).append(" "); + sql.append(columnCategories[i]).append(","); + } + sql.deleteCharAt(sql.length() - 1); + sql.append(")"); + return sql.toString(); + } + + private void insert(String data) { + String sql = getInsertValueSQL(data); + try { + tableSession.executeNonQueryStatement(sql); + LOGGER.info("Insert Success: {}", sql); + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("Insert Error: ", e); + } + } + + private String getInsertValueSQL(String s) { + StringBuilder sql = new StringBuilder(); + String[] curDataInfo = s.split(";"); + int valueSetSize = curDataInfo.length - 3; + String database = curDataInfo[0]; + String tableName = curDataInfo[1]; + String columnNames = curDataInfo[2]; + sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"("); + sql.append(columnNames).append(") VALUES "); + + for (int j = 0; j < valueSetSize; j++) { + String columnValues = curDataInfo[3 + j]; + sql.append("("); + sql.append(columnValues); + sql.append("),"); + } + sql.deleteCharAt(sql.length() - 1); + return sql.toString(); + } +} diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java new file mode 100644 index 00000000..2bd42a06 --- /dev/null +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.example.relational; + +import com.rabbitmq.client.*; +import org.example.RabbitMQChannelUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +public class RelationalRabbitMQProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalRabbitMQProducer.class); + + public static void main(String[] args) { + try (Connection connection = RabbitMQChannelUtils.getRelationalConnection()) { + Channel channel = connection.createChannel(); + channel.exchangeDeclare(RelationalConstant.TOPIC, BuiltinExchangeType.TOPIC); + channel.confirmSelect(); + AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build(); + for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) { + String key = String.format("%s.%s", "IoTDB", Objects.toString(i)); + channel.queueDeclare(key, true, false, false, null); + channel.basicPublish(RelationalConstant.TOPIC, key, false, basicProperties, RelationalConstant.ALL_DATA[i].getBytes()); + try { + if (channel.waitForConfirms()) { + LOGGER.info(" [x] Sent : {}", RelationalConstant.ALL_DATA[i]); + } else { + LOGGER.error(" [x] Timed out waiting for confirmation"); + } + } catch (InterruptedException e) { + LOGGER.error(" [x] Interrupted while waiting for confirmation"); + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } +} From af4712286380746a0ae063d4167efe00be3508c5 Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Wed, 17 Sep 2025 12:31:14 +0800 Subject: [PATCH 5/8] Added constants in RabbitMQ Constant.java --- .../src/main/java/org/apache/iotdb/rabbitmq/Constant.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java index 54df3233..fdc9bf86 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/Constant.java @@ -22,6 +22,11 @@ public class Constant { private Constant() {} + public static final String SERVER_HOST = "localhost"; + public static final int SERVER_PORT = 5672; + public static final String RABBITMQ_VHOST = "/"; + public static final String RABBITMQ_USERNAME = "guest"; + public static final String RABBITMQ_PASSWORD = "guest"; public static final String CONNECTION_NAME = "RabbitMQ-Connection"; public static final String RABBITMQ_CONSUMER_QUEUE = "IoTDB_Topic_Queue"; public static final String RABBITMQ_CONSUMER_TAG = "IoTDB_CONSUMER_TAG"; From f3b757fbb4ca71b437426baadb984150f74f98bd Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Wed, 17 Sep 2025 17:00:30 +0800 Subject: [PATCH 6/8] Update package name --- .../org/apache/iotdb/kafka/relational/RelationalConstant.java | 2 +- .../org/apache/iotdb/kafka/relational/RelationalConsumer.java | 2 +- .../apache/iotdb/kafka/relational/RelationalConsumerThread.java | 2 +- .../org/apache/iotdb/kafka/relational/RelationalProducer.java | 2 +- .../apache/iotdb/rabbitmq/relational/RelationalConstant.java | 2 +- .../iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java | 2 +- .../iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java | 2 +- .../apache/iotdb/rocketmq/relational/RelationalConstant.java | 2 +- .../iotdb/rocketmq/relational/RelationalRocketMQConsumer.java | 2 +- .../iotdb/rocketmq/relational/RelationalRocketMQProducer.java | 2 +- .../org/apache/iotdb/rocketmq/relational/RelationalUtils.java | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java index fb752ce8..aebdef36 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.kafka.relational; public class RelationalConstant { diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java index 6ae9f752..3e9c37c8 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.kafka.relational; import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.pool.ITableSessionPool; diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java index ee344c1d..073de7dc 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.kafka.relational; import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.pool.ITableSessionPool; diff --git a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java index 3e871946..9bfc85d5 100644 --- a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java +++ b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.kafka.relational; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java index 1fde0bd8..e3ce8f4d 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rabbitmq.relational; public class RelationalConstant { diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java index d4fae039..5ae0705d 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rabbitmq.relational; import com.rabbitmq.client.*; import org.apache.iotdb.isession.ITableSession; diff --git a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java index 2bd42a06..95f0581f 100644 --- a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java +++ b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rabbitmq.relational; import com.rabbitmq.client.*; import org.example.RabbitMQChannelUtils; diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java index a8f265fd..e492fba5 100644 --- a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rocketmq.relational; public class RelationalConstant { diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java index 9a166b6b..acbae725 100644 --- a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rocketmq.relational; import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.rpc.IoTDBConnectionException; diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java index e59a4d72..f96b5bd3 100644 --- a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rocketmq.relational; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; diff --git a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java index 3ef0e512..2bbd3c05 100644 --- a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java +++ b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java @@ -17,7 +17,7 @@ * under the License. */ -package org.example.relational; +package org.apache.iotdb.rocketmq.relational; public class RelationalUtils { From ae8da3008a697752893f6644684ad994c60d9c39 Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Thu, 18 Sep 2025 10:04:29 +0800 Subject: [PATCH 7/8] Updated readme documents for Kafka, RocketMQ and RabbitMQ + Upgraded iotdb-session to 2.0.5 --- examples/kafka/readme.md | 130 ++++++++++++++++++++++++++---------- examples/rabbitmq/pom.xml | 1 + examples/rabbitmq/readme.md | 103 ++++++++++++++++++++++++---- examples/rocketmq/pom.xml | 1 + examples/rocketmq/readme.md | 98 +++++++++++++++++++++++---- pom.xml | 2 +- 6 files changed, 273 insertions(+), 62 deletions(-) diff --git a/examples/kafka/readme.md b/examples/kafka/readme.md index af4c3d25..a2042b5b 100644 --- a/examples/kafka/readme.md +++ b/examples/kafka/readme.md @@ -25,51 +25,111 @@ The example is to show how to send data from localhost to IoTDB through Kafka. ``` ## Usage ### Version usage -IoTDB: 1.0.0 -Kafka: 2.8.0 + +| | Version | +|-------|---------| +| IoTDB | 2.0.5 | +| Kafka | 2.8.2 | + ### Dependencies with Maven ``` - - org.apache.kafka - kafka_2.13 - 2.8.0 - - - org.apache.iotdb - iotdb-session - 1.0.0 - + + org.apache.kafka + kafka_2.13 + 2.8.2 + + + org.apache.iotdb + iotdb-session + 2.0.5 + ``` -### Launch the servers +### Prerequisite Steps -``` -  Before you run the program, make sure you have launched the servers of Kafka and IoTDB. -  For details, please refer to http://kafka.apache.org/081/documentation.html#quickstart -``` +#### 1. Install IoTDB +please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download) -### Run Producer.java +#### 2. Install Kafka +please refer to [https://kafka.apache.org/downloads](https://kafka.apache.org/downloads) -``` - The class is to send data from localhost to Kafka clusters. -  Firstly, you have to change the parameter of TOPIC in Constant.java to what you create:(for example : "Kafka-Test") -  > public final static String TOPIC = "Kafka-Test"; -  The default format of data is "device,timestamp,value ". (for example : "measurement1,2017/10/24 19:30:00,60") - Then you need to create data in Constat.ALL_DATA -  Finally, run KafkaProducer.java -``` +#### 3. Startup IoTDB +please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html) -### Run Consumer.java +#### 4. Startup Kafka +please refer to [https://kafka.apache.org/quickstart](https://kafka.apache.org/quickstart) + + +### Case 1: Send data from localhost to IoTDB-tree + +Files related: +1. `Constant.java` : configuration of IoTDB and Kafka +2. `Producer.java` : send data from localhost to Kafka cluster +3. `Consumer.java` : consume data from Kafka cluster through multi-threads +4. `ConsumerThread.java` : consume operations done by single thread + +Step 0: Set parameter in `Constant.java` + +> Change the parameters according to your situation. + +| Parameter | Data Type | Description | +|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TOPIC | String | The topic to store data in Kafka | +| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. `"127.0.0.1:9092"` | +| CONSUMER_THREAD_NUM | int | The number of consumer threads | +| SESSION_SIZE | int | The maximum number of IoTDB sessions | +| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` | +| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` | +| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` | +| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` | +| STORAGE_GROUP | Array | The storage groups to create | +| CREATE_TIMESERIES | Array | The timeseries to create
Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"}
e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` | +| ALL_DATA | Array | The data to create
Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*"
e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` | + +Step 1: Run `Producer.java` + +> This class sends data from localhost to Kafka clusters.
+ +Step 2: Run `Consumer.java` + +> This class consumes data from Kafka through multi-threads and sends the data to IoTDB-tree. + +### Case 2: Send data from localhost to IoTDB-table + +Files related: +1. `RelationalConstant.java` : configuration of IoTDB and Kafka +2. `RelationalProducer.java` : send data from localhost to Kafka cluster +3. `RelationalConsumer.java` : consume data from Kafka cluster through multi-threads +4. `RelationalConsumerThread.java` : consume operations done by single thread + +Step 0: Set parameter in `RelationalConstant.java` + +> Change the parameters according to your situation. + +| Parameter | Data Type | Description | +|---------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TOPIC | String | The topic to store data in Kafka | +| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. `"127.0.0.1:9092"` | +| CONSUMER_THREAD_NUM | int | The number of consumer threads | +| SESSION_SIZE | int | The maximum number of IoTDB sessions | +| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` | +| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` | +| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` | +| DATABASES | Array | The databases to create | +| TABLES | Array | The tables to create
Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"}
e.g. `{"kafka_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` | +| ALL_DATA | Array | The data to create
Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*"
e.g. `"kafka_db1;tb1;time,status;17,true;18,false;19,true"` | + +Step 1: Run `RelationalProducer.java` + +> This class sends data from localhost to Kafka clusters. + +Step 2: Run `RelationalConsumer.java` + +> This class consumes data from Kafka through multi-threads and sends the data to IoTDB-table. -``` - The class is to show how to consume data from kafka through multi-threads. - The data is sent by class KafkaProducer. -  You can set the parameter of CONSUMER_THREAD_NUM in Constant.java to make sure the number of consumer threads:(for example: "3") - > private final static int CONSUMER_THREAD_NUM = 3; -``` -#### Notice -  If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1. \ No newline at end of file +### Notice +If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1. \ No newline at end of file diff --git a/examples/rabbitmq/pom.xml b/examples/rabbitmq/pom.xml index dd10aa88..4f7f1a64 100644 --- a/examples/rabbitmq/pom.xml +++ b/examples/rabbitmq/pom.xml @@ -42,6 +42,7 @@ com.rabbitmq amqp-client + 5.26.0 diff --git a/examples/rabbitmq/readme.md b/examples/rabbitmq/readme.md index 1a983e70..df6e28fd 100644 --- a/examples/rabbitmq/readme.md +++ b/examples/rabbitmq/readme.md @@ -25,9 +25,11 @@ The example is to show how to send data from localhost to IoTDB through RabbitMQ ``` ## Usage ### Version usage -IoTDB: 0.12.0 -RabbitMQ server: 3.8.15 -RabbitMQ client jar: 5.12.0 + +| | Version | +|----------|---------| +| IoTDB | 2.0.5 | +| RabbitMQ | 5.26.0 | ### Dependencies with Maven @@ -35,29 +37,106 @@ RabbitMQ client jar: 5.12.0 org.apache.iotdb - iotdb-jdbc - ${project.version} + iotdb-session + 2.0.5 com.rabbitmq amqp-client - 5.12.0 + 5.26.0 ``` -### 1. Install IoTDB +### Prerequisite Steps + +#### 1. Install IoTDB please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download) -### 2. Install RabbitMQ +#### 2. Install RabbitMQ please refer to [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) -### 3. Startup IoTDB +#### 3. Startup IoTDB please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html) -### 4. Startup RocketMQ +#### 4. Startup RocketMQ please refer to [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) -### 5. Start the consumer client: RabbitMQConsumer +### Case 1: Send data from localhost to IoTDB-tree + +Files related: +1. `Constant.java` : configuration of IoTDB and RabbitMQ +2. `RabbitMQProducer.java` : send data from localhost to RabbitMQ +3. `RabbitMQConsumer.java` : consume data from RabbitMQ +4. `RabbitMQChannelUtils.java` : common functions + +Step 0: Set parameter in `Constant.java` + +> Change the parameters according to your situation. + +| Parameter | Data Type | Description | +|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TOPIC | String | The topic to store data in Kafka | +| SERVER_HOST | String | The server ip of RabbitMQ, e.g. `"localhost"` | +| SERVER_PORT | int | The server port of RabbitMQ, e.g. `5672` | +| RABBITMQ_VHOST | String | The virtual host in RabbitMQ, e.g. `"/"` | +| RABBITMQ_USERNAME | String | RabbitMQ username, e.g. `"guest"` | +| RABBITMQ_PASSWORD | String | RabbitMQ password, e.g. `"guest"` | +| CONNECTION_NAME | String | RabbitMQ connection name | +| RABBITMQ_CONSUMER_QUEUE | String | The consumer queue name in RabbitMQ | +| RABBITMQ_CONSUMER_TAG | String | The consumer tag in RabbitMQ | +| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` | +| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` | +| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` | +| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` | +| STORAGE_GROUP | Array | The storage groups to create | +| TIMESERIESLIST | Array | The timeseries to create
Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"}
e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` | +| ALL_DATA | Array | The data to create
Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*"
e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` | + + +Step 1: Run `RabbitMQProducer.java` + +> This class sends data from localhost to RabbitMQ. + +Step 2: Run `RabbitMQConsumer.java` + +> This class consumes data from RabbitMQ and sends the data to IoTDB-tree. + +### Case 2: Send data from localhost to IoTDB-table + +Files related: +1. `RelationalConstant.java` : configuration of IoTDB and RabbitMQ +2. `RelationalRabbitMQProducer.java` : send data from localhost to RabbitMQ +3. `RelationalRabbitMQConsumer.java` : consume data from RabbitMQ +4. `RabbitMQChannelUtils.java` : common functions + +Step 0: Set parameter in `RelationalConstant.java` + +> Change the parameters according to your situation. + +| Parameter | Data Type | Description | +|-------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TOPIC | String | The topic to store data in Kafka | +| SERVER_HOST | String | The server ip of RabbitMQ, e.g. `"localhost"` | +| SERVER_PORT | int | The server port of RabbitMQ, e.g. `5672` | +| RABBITMQ_VHOST | String | The virtual host in RabbitMQ, e.g. `"/"` | +| RABBITMQ_USERNAME | String | RabbitMQ username, e.g. `"guest"` | +| RABBITMQ_PASSWORD | String | RabbitMQ password, e.g. `"guest"` | +| CONNECTION_NAME | String | RabbitMQ connection name | +| RABBITMQ_CONSUMER_QUEUE | String | The consumer queue name in RabbitMQ | +| RABBITMQ_CONSUMER_TAG | String | The consumer tag in RabbitMQ | +| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` | +| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` | +| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` | +| DATABASES | Array | The databases to create | +| TABLES | Array | The tables to create
Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"}
e.g. `{"rabbitmq_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` | +| ALL_DATA | Array | The data to create
Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*"
e.g. `"rabbitmq_db1;tb1;time,status;17,true;18,false;19,true"` | + +Step 1: Run `RabbitMQProducer.java` + +> This class sends data from localhost to RabbitMQ. + +Step 2: Run `RabbitMQConsumer.java` + +> This class consumes data from RabbitMQ and sends the data to IoTDB-table. -### 6. Start the producer client: RabbitMQProducer \ No newline at end of file diff --git a/examples/rocketmq/pom.xml b/examples/rocketmq/pom.xml index b880681c..fc7d25fc 100644 --- a/examples/rocketmq/pom.xml +++ b/examples/rocketmq/pom.xml @@ -37,6 +37,7 @@ org.apache.rocketmq rocketmq-client + 5.3.3 com.alibaba diff --git a/examples/rocketmq/readme.md b/examples/rocketmq/readme.md index 4c95c6f3..a7e6e213 100644 --- a/examples/rocketmq/readme.md +++ b/examples/rocketmq/readme.md @@ -46,38 +46,108 @@ Producers insert IoTDB insert statements into partitions according to devices, e ## Usage ### Version usage -IoTDB: 1.0.0 -RocketMQ: 4.4.0 + +| | Version | +|----------|---------| +| IoTDB | 2.0.5 | +| RocketMQ | 5.3.3 | + ### Dependencies with Maven ``` - org.apache.iotdb - iotdb-session - 1.0.0 + org.apache.iotdb + iotdb-session + 2.0.5 - org.apache.rocketmq - rocketmq-client - 4.4.0 + org.apache.rocketmq + rocketmq-client + 5.3.3 ``` Note: The maven dependencies of io.netty in IoTDB are in conflicts with those dependencies in RocketMQ-Client. -### 1. Install IoTDB +### Prerequisite Steps + +#### 1. Install IoTDB please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/#/Download) -### 2. Install RocketMQ +#### 2. Install RocketMQ please refer to [http://rocketmq.apache.org/docs/quick-start/](http://rocketmq.apache.org/docs/quick-start/) -### 3. Startup IoTDB +#### 3. Startup IoTDB please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html) -### 4. Startup RocketMQ +#### 4. Startup RocketMQ please refer to [http://rocketmq.apache.org/docs/quick-start/](http://rocketmq.apache.org/docs/quick-start/) -### 5. Start the consumer client:RocketMQConsumer +### Case 1: Send data from localhost to IoTDB-tree + +Files related: +1. `Constant.java` : configuration of IoTDB and RocketMQ +2. `RocketMQProducer.java` : send data from localhost to RocketMQ +3. `RocketMQConsumer.java` : consume data from RocketMQ +4. `Utils.java` : utils + +Step 0: Set parameter in `Constant.java` + +> Change the parameters according to your situation. + +| Parameter | Data Type | Description | +|---------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TOPIC | String | The topic to store data in RocketMQ | +| SERVER_ADDRESS | String | The server address of RocketMQ, e.g. `"127.0.0.1:9876"` | +| PRODUCER_GROUP | String | The producer group name in RocketMQ | +| CONSUMER_GROUP | String | The consumer group name in RocketMQ | +| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. `"localhost"` | +| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. `6667` | +| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. `"root"` | +| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. `"root"` | +| STORAGE_GROUP | Array | The storage groups to create | +| CREATE_TIMESERIES | Array | The timeseries to create
Format of a single timeseries: {"timeseries", "dataType", "encodingType", "compressionType"}
e.g. `{"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}` | +| ALL_DATA | Array | The data to create
Format of a single data: "device,timestamp,fieldName\[:fieldName\]\*,dataType\[:dataType\]\*,value\[:value\]\*"
e.g. `"root.vehicle.d0,10,s0,INT32,100"`, `"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"` | + +Step 1: Run `RocketMQProducer.java` + +> This class sends data from localhost to RocketMQ.
+ +Step 2: Run `RocketMQConsumer.java` + +> This class consumes data from RocketMQ and sends the data to IoTDB-tree. + +### Case 2: Send data from localhost to IoTDB-table + +Files related: +1. `RelationalConstant.java` : configuration of IoTDB and RocketMQ +2. `RelationalRocketMQProducer.java` : send data from localhost to RocketMQ +3. `RelationalRocketMQConsumer.java` : consume data from RocketMQ +4. `RelationalUtils.java` : utils + +Step 0: Set parameter in `RelationalConstant.java` + +> Change the parameters according to your situation. + +| Parameter | Data Type | Description | +|----------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TOPIC | String | The topic to store data in RocketMQ | +| SERVER_ADDRESS | String | The server address of RocketMQ, e.g. `"127.0.0.1:9876"` | +| PRODUCER_GROUP | String | The producer group name in RocketMQ | +| CONSUMER_GROUP | String | The consumer group name in RocketMQ | +| IOTDB_URLS | Array | IoTDB urls, e.g. `{"localhost:6667"}` | +| IOTDB_USERNAME | String | IoTDB username, e.g. `"root"` | +| IOTDB_PASSWORD | String | IoTDB password, e.g. `"root"` | +| DATABASES | Array | The databases to create | +| TABLES | Array | The tables to create
Format of a single table: {"database", "tableName", "columnNames", "columnTypes", "columnCategories"}
e.g. `{"rocketmq_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}` | +| ALL_DATA | Array | The data to create
Format of a single data: "database;tableName;columnName\[,columnName\]\*;value\[,value\]\*\[;value\[,value\]\*\]\*"
e.g. `"rocketmq_db1;tb1;time,status;17,true;18,false;19,true"` | + + +Step 1: Run `RelationalRocketMQProducer.java` + +> This class sends data from localhost to RocketMQ. + +Step 2: Run `RelationalRocketMQConsumer.java` -### 6. Start the producer client:RocketMQProducer +> This class consumes data from RocketMQ and sends the data to IoTDB-table. diff --git a/pom.xml b/pom.xml index 0bcd47a9..9ce4d43b 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ https://github.com/apache/iotdb-bin-resources/tree/main/iotdb-tools-thrift --> 0.14.1.0 - 2.0.3 + 2.0.5 2.16.2 4.0.4 From e92aed2879ad5b6e77793d95690a3e7dae6e1d3a Mon Sep 17 00:00:00 2001 From: LimJiaWenBrenda Date: Thu, 18 Sep 2025 12:10:35 +0800 Subject: [PATCH 8/8] Renew IoTDB quick start link --- examples/kafka/readme.md | 2 +- examples/rabbitmq/readme.md | 2 +- examples/rocketmq/readme.md | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/kafka/readme.md b/examples/kafka/readme.md index a2042b5b..765a0cba 100644 --- a/examples/kafka/readme.md +++ b/examples/kafka/readme.md @@ -57,7 +57,7 @@ please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/# please refer to [https://kafka.apache.org/downloads](https://kafka.apache.org/downloads) #### 3. Startup IoTDB -please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html) +please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html) #### 4. Startup Kafka please refer to [https://kafka.apache.org/quickstart](https://kafka.apache.org/quickstart) diff --git a/examples/rabbitmq/readme.md b/examples/rabbitmq/readme.md index df6e28fd..a53ddf68 100644 --- a/examples/rabbitmq/readme.md +++ b/examples/rabbitmq/readme.md @@ -57,7 +57,7 @@ please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/# please refer to [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) #### 3. Startup IoTDB -please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html) +please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html) #### 4. Startup RocketMQ please refer to [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) diff --git a/examples/rocketmq/readme.md b/examples/rocketmq/readme.md index a7e6e213..b9a8bb8e 100644 --- a/examples/rocketmq/readme.md +++ b/examples/rocketmq/readme.md @@ -79,7 +79,7 @@ please refer to [https://iotdb.apache.org/#/Download](https://iotdb.apache.org/# please refer to [http://rocketmq.apache.org/docs/quick-start/](http://rocketmq.apache.org/docs/quick-start/) #### 3. Startup IoTDB -please refer to [Quick Start](http://iotdb.apache.org/UserGuide/Master/Get%20Started/QuickStart.html) +please refer to [Quick Start](https://iotdb.apache.org/UserGuide/latest/QuickStart/QuickStart_apache.html) #### 4. Startup RocketMQ please refer to [http://rocketmq.apache.org/docs/quick-start/](http://rocketmq.apache.org/docs/quick-start/)