Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.kafka.relational;

public class RelationalConstant {

public static final String KAFKA_SERVICE_URL = "172.20.31.71:9094";
public static final String TOPIC = "Kafka-Relational-Test";
public static final String[] IOTDB_URLS = {
"127.0.0.1:6667"
};
public static final String IOTDB_USERNAME = "root";
public static final String IOTDB_PASSWORD = "root";
public static final int SESSION_SIZE = 3;
public static final int CONSUMER_THREAD_NUM = 5;
public static final String[] DATABASES = {"kafka_db1", "kafka_db2"};
public static final String[][] TABLES = {
// database, tableName, columnNames, columnTypes, columnCategories
{"kafka_db1", "tb1", "time,region,model_id,temperature,status", "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
{"kafka_db2", "tb2", "time,plant_id,humidity,status", "TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
};
public static final String[] ALL_DATA = {
// database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
"kafka_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
"kafka_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
"kafka_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
"kafka_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
"kafka_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
"kafka_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
"kafka_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.kafka.relational;

import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.pool.ITableSessionPool;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RelationalConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(RelationalConsumer.class);
private static ITableSessionPool tableSessionPool;
private List<KafkaConsumer<String, String>> consumerList;

private RelationalConsumer(List<KafkaConsumer<String, String>> consumerList) {
this.consumerList = consumerList;
initSessionPool();
}

private static void initSessionPool() {
tableSessionPool =
new TableSessionPoolBuilder()
.nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
.user(RelationalConstant.IOTDB_USERNAME)
.password(RelationalConstant.IOTDB_PASSWORD)
.maxSize(RelationalConstant.SESSION_SIZE)
.build();
}

public static void main(String[] args) {
List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumerList.add(consumer);
consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC));
}
RelationalConsumer consumer = new RelationalConsumer(consumerList);
initIoTDB();
consumer.consumeInParallel();
}

private static void initIoTDB() {
for (String db : RelationalConstant.DATABASES) {
boolean res = createDatabase(db);
if (!res) {
throw new RuntimeException("Create database failed");
}
}
for (String[] tableInfo : RelationalConstant.TABLES) {
boolean res = createTable(tableInfo);
if (!res) {
throw new RuntimeException("Create table failed");
}
}
}

private static boolean createDatabase(String dbName) {
try (ITableSession session = tableSessionPool.getSession()) {
try {
session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName));
} catch (IoTDBConnectionException | StatementExecutionException e) {
LOGGER.error("Create Database Error: ", e);
return false;
}
} catch (IoTDBConnectionException e) {
LOGGER.error("Get Table Session Error: ", e);
return false;
}
return true;
}

private static boolean createTable(String[] tableInfo) {
try (ITableSession session = tableSessionPool.getSession()) {
String sql = getCreateTableSQL(tableInfo);
try {
session.executeNonQueryStatement(sql);
} catch (IoTDBConnectionException | StatementExecutionException e) {
LOGGER.error("Create Table Error: ", e);
return false;
}
} catch (IoTDBConnectionException e) {
LOGGER.error("Get Table Session Error: ", e);
return false;
}
return true;
}

private static String getCreateTableSQL(String[] tableInfo) {
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE \"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");

String[] columnNames = tableInfo[2].split(",");
String[] columnTypes = tableInfo[3].split(",");
String[] columnCategories = tableInfo[4].split(",");
int columnSize = columnNames.length;

for (int i = 0; i < columnSize; i++) {
sql.append(columnNames[i]).append(" ");
sql.append(columnTypes[i]).append(" ");
sql.append(columnCategories[i]).append(",");
}
sql.deleteCharAt(sql.length() - 1);
sql.append(")");
return sql.toString();
}

private void consumeInParallel() {
ExecutorService executor = Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM);
for (int i = 0; i < consumerList.size(); i++) {
RelationalConsumerThread consumerThread = new RelationalConsumerThread(consumerList.get(i), tableSessionPool);
executor.submit(consumerThread);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.kafka.relational;

import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.pool.ITableSessionPool;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.example.ConsumerThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class RelationalConsumerThread implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerThread.class);
private KafkaConsumer<String, String> consumer;
private ITableSessionPool tableSessionPool;

public RelationalConsumerThread(KafkaConsumer<String, String> consumer, ITableSessionPool tableSessionPool) {
this.consumer = consumer;
this.tableSessionPool = tableSessionPool;
}

@Override
public void run() {
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
LOGGER.info("Received records: {}", records.count());
List<String> dataList = new ArrayList<>(records.count());
for (ConsumerRecord<String, String> consumerRecord : records) {
dataList.add(consumerRecord.value());
}
insertDataList(dataList);
} while (true);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}

private void insertDataList(List<String> dataList) {
for (String s : dataList) {
String sql = getInsertValueSQL(s);

try (ITableSession session = tableSessionPool.getSession()) {
try {
session.executeNonQueryStatement(sql);
} catch (IoTDBConnectionException | StatementExecutionException e) {
LOGGER.error("Insert Values Into Table Error: ", e);
}
} catch (IoTDBConnectionException e) {
LOGGER.error("Get Table Session Error: ", e);
}
}
}

private String getInsertValueSQL(String s) {
StringBuilder sql = new StringBuilder();
String[] curDataInfo = s.split(";");
int valueSetSize = curDataInfo.length - 3;
String database = curDataInfo[0];
String tableName = curDataInfo[1];
String columnNames = curDataInfo[2];
sql.append("INSERT INTO \"").append(database).append("\".\"").append(tableName).append("\"(");
sql.append(columnNames).append(") VALUES ");

for (int j = 0; j < valueSetSize; j++) {
String columnValues = curDataInfo[3 + j];
sql.append("(");
sql.append(columnValues);
sql.append("),");
}
sql.deleteCharAt(sql.length() - 1);
return sql.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.kafka.relational;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class RelationalProducer {

private final KafkaProducer<String, String> kafkaProducer;
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalProducer.class);

public RelationalProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, RelationalConstant.KAFKA_SERVICE_URL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProducer = new KafkaProducer<>(props);
}

public static void main(String[] args) {
RelationalProducer relationalProducer = new RelationalProducer();
relationalProducer.produce();
relationalProducer.close();
}

private void produce() {
for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
String key = Integer.toString(i);
try {
RecordMetadata metadata = kafkaProducer.send(new ProducerRecord<>(RelationalConstant.TOPIC, key, RelationalConstant.ALL_DATA[i])).get();
LOGGER.info("Sent record(key={} value={}) meta(partition={}, offset={})\n", key, RelationalConstant.ALL_DATA[i], metadata.partition(), metadata.offset());
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}

private void close() {
kafkaProducer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
public class Constant {
private Constant() {}

public static final String SERVER_HOST = "localhost";
public static final int SERVER_PORT = 5672;
public static final String RABBITMQ_VHOST = "/";
public static final String RABBITMQ_USERNAME = "guest";
public static final String RABBITMQ_PASSWORD = "guest";
public static final String CONNECTION_NAME = "RabbitMQ-Connection";
public static final String RABBITMQ_CONSUMER_QUEUE = "IoTDB_Topic_Queue";
public static final String RABBITMQ_CONSUMER_TAG = "IoTDB_CONSUMER_TAG";
Expand Down
Loading