Apache SeaTunnel

1. Overview

SeaTunnel is a distributed integration platform designed for massive data. Leveraging its high performance and elastic scaling capabilities, it connects multi-source heterogeneous data links through standardized Connectors (composed of Source and Sink). The platform uniformly abstracts various data sources into the SeaTunnelRow format via Source. After dynamic resource scheduling and batch processing optimization, it efficiently writes data to different storage systems through Sink. Through the deep integration of the IoTDB Connector with SeaTunnel, it not only addresses core challenges in time-series data scenarios such as high-throughput writing, multi-source governance, and complex analysis, but also helps enterprises quickly build low-cost, highly reliable, and easily scalable data infrastructure in fields like the Internet of Things and industrial internet, leveraging the out-of-the-box connector ecosystem and automated operation and maintenance capabilities.

2. Usage Steps

2.1 Environment Preparation

2.1.1 Software Requirements

SoftwareVersionInstallation Reference
IoTDB>= 2.0.5Quick Start
SeaTunnel2.3.12Official Website
  • Thrift Version Conflict Resolution (Only required for Spark engine):
# Remove older Thrift from Spark
rm -f $SPARK_HOME/jars/libthrift*
# Copy IoTDB's Thrift library to Spark classpath
cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/

2.1.2 Dependency Configuration

  1. JDBC
  • Spark/Flink Engine: Place the JDBC driver JAR into the ${SEATUNNEL_HOME}/plugins/ directory.
  • SeaTunnel Zeta Engine: Place the JDBC driver JAR into the ${SEATUNNEL_HOME}/lib/ directory.
  1. Connector

Place the corresponding version of the SeaTunnel Connector into the ${SEATUNNEL_HOME}/plugins/ directory.

2.2 Reading Data (IoTDB Source Connector)

2.2.1 Configuration Parameters

ParameterTypeRequiredDefaultDescription
node_urlsstringyes-IoTDB cluster address, format: "host1:port" or "host1:port,host2:port"
usernamestringyes-IoTDB username
passwordstringyes-IoTDB password
sql_dialectstringnotreeIoTDB model: tree for tree model; table for table model
sqlstringyes-SQL query statement to execute
databasestringno-Database name, only effective in table model
schemaconfigyes-Data schema definition
fetch_sizeintno-Number of data rows fetched per request from IoTDB during query execution
lower_boundlongno-Lower bound of time range (used for data partitioning by time column)
upper_boundlongno-Upper bound of time range (used for data partitioning by time column)
num_partitionsintno-Number of partitions (used when partitioning by time column):
1 partition: uses the full time range
If partitions < (upper_bound - lower_bound), the difference is used as actual partitions
thrift_default_buffer_sizeintno-Thrift protocol buffer size
thrift_max_frame_sizeintno-Thrift maximum frame size
enable_cache_leaderbooleanno-Whether to enable leader node caching
versionstringno-Client SQL semantic version (V_0_12 / V_0_13)

2.2.2 Configuration Example

  1. Create a new file iotdb_source_example.conf in the ${SEATUNNEL_HOME}/config/ directory:
env {
    parallelism = 2       # Parallelism set to 2
    job.mode = "BATCH"    # Batch mode
}

source {
    IoTDB {
        node_urls = "localhost:6667"
        username = "root"
        password = "root"
        sql_dialect = "table"
        sql = "SELECT time,device_id,city,s1,s2,s3,s4 FROM tcollector.table1"
        schema {
            fields {
                time = timestamp
                device_id = string
                city= string
                s1= int
                s2= bigint
                s3= float
                s4= double
            }
        }
    }
}

sink {
    Console {
    }  # Output to console
}
  1. Run SeaTunnel with the following command:
./bin/seatunnel.sh --config config/iotdb_source_example.conf -e local
  1. For more details, please refer to the official Apache SeaTunnel documentation on IoTDB Source Connector.

2.3 Writing Data (IoTDB Sink Connector)

2.3.1 Configuration Parameters

ParameterTypeRequiredDefaultDescription
node_urlsArrayyes-IoTDB cluster address, format: ["host1:port"] or ["host1:port","host2:port"]
usernameStringyes-IoTDB username
passwordStringyes-IoTDB password
sql_dialectStringnotreeIoTDB model: tree for tree model; table for table model
storage_groupStringyes-IoTDB tree model: specifies the storage group for devices (path prefix) e.g., deviceId = ${storage_group} + “.” + ${key_device}; IoTDB table model: specifies the database
key_deviceStringyes-IoTDB tree model: field name in SeaTunnelRow that specifies the IoTDB device ID; IoTDB table model: field name in SeaTunnelRow that specifies the IoTDB table name
key_timestampStringnoprocessing timeIoTDB tree model: field name in SeaTunnelRow that specifies the IoTDB timestamp (if not specified, processing time is used as timestamp); IoTDB table model: field name in SeaTunnelRow that specifies the IoTDB time column (if not specified, processing time is used as timestamp)
key_measurement_fieldsArraynoSee descriptionIoTDB tree model: field names in SeaTunnelRow that specify the list of IoTDB measurements (if not specified, includes all fields except key_device and key_timestamp); IoTDB table model: field names in SeaTunnelRow that specify the IoTDB field columns (if not specified, includes all fields except key_device, key_timestamp, key_tag_fields, key_attribute_fields)
key_tag_fieldsArrayno-IoTDB tree model: not applicable; IoTDB table model: field names in SeaTunnelRow that specify the IoTDB tag columns
key_attribute_fieldsArrayno-IoTDB tree model: not applicable; IoTDB table model: field names in SeaTunnelRow that specify the IoTDB attribute columns
batch_sizeIntegerno1024For batch writing, data is flushed to IoTDB when the buffer reaches batch_size or when the time reaches batch_interval_ms
max_retriesIntegerno-Number of retries on failed flush
retry_backoff_multiplier_msIntegerno-Multiplier used to generate the next backoff delay
max_retry_backoff_msIntegerno-Maximum wait time before retrying a request to IoTDB
default_thrift_buffer_sizeIntegerno-Initial buffer size for Thrift client in IoTDB
max_thrift_frame_sizeIntegerno-Maximum frame size for Thrift client in IoTDB
zone_idstringno-IoTDB client java.time.ZoneId
enable_rpc_compressionBooleanno-Enable RPC compression in IoTDB client
connection_timeout_in_msIntegerno-Maximum time (in milliseconds) to wait when connecting to IoTDB

2.3.2 Configuration Example

  1. Create a new file iotdb_sink_example.conf in the ${SEATUNNEL_HOME}/config/ directory:
# Define runtime environment
env {
  parallelism = 4
  job.mode = "BATCH"
}

source{
    Jdbc {
        url = "jdbc:mysql://localhost:3306/demo_db?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "IoTDB@2024"
        query = "select * from device"
    }
}
sink {
    IoTDB {
        node_urls = ["localhost:6667"]
        username = "root"
        password = "root"
        sql_dialect = "table"
        storage_group = "seatunnel"
        key_device = "id"
        key_timestamp = "intime" 
    }
}
  1. Run SeaTunnel with the following command:
./bin/seatunnel.sh --config config/iotdb_sink_example.conf -e local
  1. For more configuration parameters and examples, please refer to the official Apache SeaTunnel documentation on IoTDB Sink Connector.