Kafka-IoTDB Demo

Function

The example is to show how to send data from localhost to IoTDB through Kafka.

Usage

Version usage

Version
IoTDB2.0.5
Kafka2.8.2

Dependencies with Maven

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.13</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iotdb</groupId>
        <artifactId>iotdb-session</artifactId>
        <version>2.0.5</version>
    </dependency>
</dependencies>

Prerequisite Steps

1. Install IoTDB

please refer to https://iotdb.apache.org/#/Download

2. Install Kafka

please refer to https://kafka.apache.org/downloads

3. Startup IoTDB

please refer to Quick Start

4. Startup Kafka

please refer to https://kafka.apache.org/quickstart

Case 1: Send data from localhost to IoTDB-tree

Files related:

  1. Constant.java : configuration of IoTDB and Kafka
  2. Producer.java : send data from localhost to Kafka cluster
  3. Consumer.java : consume data from Kafka cluster through multi-threads
  4. ConsumerThread.java : consume operations done by single thread

Step 0: Set parameter in Constant.java

Change the parameters according to your situation.

ParameterData TypeDescription
TOPICStringThe topic to store data in Kafka
KAFKA_SERVICE_URLStringThe service url of Kafka, e.g. "127.0.0.1:9092"
CONSUMER_THREAD_NUMintThe number of consumer threads
SESSION_SIZEintThe maximum number of IoTDB sessions
IOTDB_CONNECTION_HOSTStringIoTDB host, e.g. "localhost"
IOTDB_CONNECTION_PORTintIoTDB port, e.g. 6667
IOTDB_CONNECTION_USERStringIoTDB username, e.g. "root"
IOTDB_CONNECTION_PASSWORDStringIoTDB password, e.g. "root"
STORAGE_GROUPArrayThe storage groups to create
CREATE_TIMESERIESArrayThe timeseries to create
Format of a single timeseries: {“timeseries”, “dataType”, “encodingType”, “compressionType”}
e.g. {"root.vehicle.d0.s0", "INT32", "PLAIN", "SNAPPY"}
ALL_DATAArrayThe data to create
Format of a single data: “device,timestamp,fieldName[:fieldName]*,dataType[:dataType]*,value[:value]*”
e.g. "root.vehicle.d0,10,s0,INT32,100", "root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'"

Step 1: Run Producer.java

This class sends data from localhost to Kafka clusters.

Step 2: Run Consumer.java

This class consumes data from Kafka through multi-threads and sends the data to IoTDB-tree.

Case 2: Send data from localhost to IoTDB-table

Files related:

  1. RelationalConstant.java : configuration of IoTDB and Kafka
  2. RelationalProducer.java : send data from localhost to Kafka cluster
  3. RelationalConsumer.java : consume data from Kafka cluster through multi-threads
  4. RelationalConsumerThread.java : consume operations done by single thread

Step 0: Set parameter in RelationalConstant.java

Change the parameters according to your situation.

ParameterData TypeDescription
TOPICStringThe topic to store data in Kafka
KAFKA_SERVICE_URLStringThe service url of Kafka, e.g. "127.0.0.1:9092"
CONSUMER_THREAD_NUMintThe number of consumer threads
SESSION_SIZEintThe maximum number of IoTDB sessions
IOTDB_URLSArrayIoTDB urls, e.g. {"localhost:6667"}
IOTDB_USERNAMEStringIoTDB username, e.g. "root"
IOTDB_PASSWORDStringIoTDB password, e.g. "root"
DATABASESArrayThe databases to create
TABLESArrayThe tables to create
Format of a single table: {“database”, “tableName”, “columnNames”, “columnTypes”, “columnCategories”}
e.g. {"kafka_db1", "tb1", "time,region,status", "TIMESTAMP,STRING,BOOLEAN", "TIME,TAG,FIELD"}
ALL_DATAArrayThe data to create
Format of a single data: “database;tableName;columnName[,columnName]*;value[,value]*[;value[,value]*]*”
e.g. "kafka_db1;tb1;time,status;17,true;18,false;19,true"

Step 1: Run RelationalProducer.java

This class sends data from localhost to Kafka clusters.

Step 2: Run RelationalConsumer.java

This class consumes data from Kafka through multi-threads and sends the data to IoTDB-table.

Notice

If you want to use multiple consumers, please make sure that the number of topic's partition you create is more than 1.