The example is to show how to send data from localhost to IoTDB through Kafka.
| Version | |
|---|---|
| IoTDB | 2.0.5 |
| Kafka | 2.8.2 |
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
please refer to https://iotdb.apache.org/#/Download
please refer to https://kafka.apache.org/downloads
please refer to Quick Start
please refer to https://kafka.apache.org/quickstart
Files related:
Constant.java : configuration of IoTDB and KafkaProducer.java : send data from localhost to Kafka clusterConsumer.java : consume data from Kafka cluster through multi-threadsConsumerThread.java : consume operations done by single threadStep 0: Set parameter in Constant.java
Change the parameters according to your situation.
| Parameter | Data Type | Description |
|---|---|---|
| TOPIC | String | The topic to store data in Kafka |
| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. "127.0.0.1:9092" |
| CONSUMER_THREAD_NUM | int | The number of consumer threads |
| SESSION_SIZE | int | The maximum number of IoTDB sessions |
| IOTDB_CONNECTION_HOST | String | IoTDB host, e.g. "localhost" |
| IOTDB_CONNECTION_PORT | int | IoTDB port, e.g. 6667 |
| IOTDB_CONNECTION_USER | String | IoTDB username, e.g. "root" |
| IOTDB_CONNECTION_PASSWORD | String | IoTDB password, e.g. "root" |
| STORAGE_GROUP | Array | The storage groups to create |
| CREATE_TIMESERIES | Array | The timeseries to create Format of a single timeseries: {“timeseries”, “dataType”, “encodingType”, “compressionType”} e.g. {"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"} |
| ALL_DATA | Array | The data to create Format of a single data: “device,timestamp,fieldName[:fieldName]*,dataType[:dataType]*,value[:value]*” e.g. "root.vehicle.d0,10,s0,INT32,100", "root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'" |
Step 1: Run Producer.java
This class sends data from localhost to Kafka clusters.
Step 2: Run Consumer.java
This class consumes data from Kafka through multi-threads and sends the data to IoTDB-tree.
Files related:
RelationalConstant.java : configuration of IoTDB and KafkaRelationalProducer.java : send data from localhost to Kafka clusterRelationalConsumer.java : consume data from Kafka cluster through multi-threadsRelationalConsumerThread.java : consume operations done by single threadStep 0: Set parameter in RelationalConstant.java
Change the parameters according to your situation.
| Parameter | Data Type | Description |
|---|---|---|
| TOPIC | String | The topic to store data in Kafka |
| KAFKA_SERVICE_URL | String | The service url of Kafka, e.g. "127.0.0.1:9092" |
| CONSUMER_THREAD_NUM | int | The number of consumer threads |
| SESSION_SIZE | int | The maximum number of IoTDB sessions |
| IOTDB_URLS | Array | IoTDB urls, e.g. {"localhost:6667"} |
| IOTDB_USERNAME | String | IoTDB username, e.g. "root" |
| IOTDB_PASSWORD | String | IoTDB password, e.g. "root" |
| DATABASES | Array | The databases to create |
| TABLES | Array | The tables to create Format of a single table: {“database”, “tableName”, “columnNames”, “columnTypes”, “columnCategories”} e.g. {"kafka_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"} |
| ALL_DATA | Array | The data to create Format of a single data: “database;tableName;columnName[,columnName]*;value[,value]*[;value[,value]*]*” e.g. "kafka_db1;tb1;time,status;17,true;18,false;19,true" |
Step 1: Run RelationalProducer.java
This class sends data from localhost to Kafka clusters.
Step 2: Run RelationalConsumer.java
This class consumes data from Kafka through multi-threads and sends the data to IoTDB-table.
If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1.