IoTDB provides the Spark-IoTDB-Table-Connector to integrate IoTDB's table model with Spark, enabling data read/write operations in Spark environments through both DataFrame and Spark SQL interfaces.
DataFrame is a structured data abstraction in Spark, containing schema metadata (column names, data types, etc.) and serving as the primary data carrier between Spark operators. DataFrame transformations follow a lazy execution mechanism, where operations are only physically executed upon triggering an action (e.g., writing results or invoking collect()), thereby optimizing resource utilization by avoiding redundant computations.
The Spark-IoTDB-Table-Connector allows:
• Write: Processed DataFrames from upstream tasks can be directly written into IoTDB tables.
• Read: Data from IoTDB tables can be loaded as DataFrames for downstream analytical tasks.
Spark clusters can be accessed via the Spark-SQL Shell for interactive SQL execution. The Spark-IoTDB-Table-Connector maps IoTDB tables to temporary external views in Spark, enabling direct read/write operations using Spark SQL.
| Software | Version |
|---|---|
Spark-IoTDB-Table-Connector | 2.0.3 |
Spark | 3.3-3.5 |
IoTDB | 2.0.1+ |
Scala | 2.12 |
JDK | 8,11 |
Add the following dependency to your project’s pom.xml:
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>spark-iotdb-table-connector-3.5</artifactId> <version>2.0.3</version> </dependency>
Spark-IoTDB-Table-Connector JAR from the official repository.${SPARK_HOME}/jars directory.val df = spark.read.format("org.apache.iotdb.spark.table.db.IoTDBTableProvider") .option("iotdb.database", "$YOUR_IOTDB_DATABASE_NAME") .option("iotdb.table", "$YOUR_IOTDB_TABLE_NAME") .option("iotdb.username", "$YOUR_IOTDB_USERNAME") .option("iotdb.password", "$YOUR_IOTDB_PASSWORD") .option("iotdb.url", "$YOUR_IOTDB_URL") .load()
CREATE TEMPORARY VIEW spark_iotdb USING org.apache.iotdb.spark.table.db.IoTDBTableProvider OPTIONS( "iotdb.database"="$YOUR_IOTDB_DATABASE_NAME", "iotdb.table"="$YOUR_IOTDB_TABLE_NAME", "iotdb.username"="$YOUR_IOTDB_USERNAME", "iotdb.password"="$YOUR_IOTDB_PASSWORD", "iotdb.urls"="$YOUR_IOTDB_URL" ); SELECT * FROM spark_iotdb;
| Parameter | Default | Description | Mandatory |
|---|---|---|---|
iotdb.database | — | IoTDB database name (must pre-exist in IoTDB) | Yes |
iotdb.table | — | IoTDB table name (must pre-exist in IoTDB) | Yes |
iotdb.username | root | IoTDB username | No |
iotdb.password | root | IoTDB password | No |
iotdb.urls | 127.0.0.1:6667 | IoTDB DataNode RPC endpoints (comma-separated for multiple nodes) | No |
IoTDB supports several filtering conditions, column pruning, and OFFSET/LIMIT pushdown.
| Name | SQL( IoTDB) |
|---|---|
IS_NULL | expr IS NULL |
IS_NOT_NULL | expr IS NOT NULL |
STARTS_WITH | starts_with(expr1, expr2) |
ENDS_WITH | ends_with(expr1, expr2) |
CONTAINS | expr1 LIKE '%expr2%' |
IN | expr IN (expr1, expr2,...) |
= | expr1 = expr2 |
<> | expr1 <> expr2 |
< | expr1 < expr2 |
<= | expr1 <= expr2 |
> | expr1 > expr2 |
>= | expr1 >= expr2 |
AND | expr1 AND expr2 |
OR | expr1 OR expr2 |
NOT | NOT expr |
ALWAYS_TRUE | TRUE |
ALWAYS_FALSE | FASLE |
Constraints:
CONTAINSrequires constant values forexpr2.- Non-pushdown-capable child expressions invalidate the entire conjunctive clause.
Supports specifying column names when constructing IoTDB SQL queries to avoid transferring unnecessary column data.
Supports pushdown of OFFSET and LIMIT clauses, enabling direct integration of Spark-provided pagination parameters into IoTDB queries.
val df = spark.createDataFrame(List( (1L, "tag1_value1", "tag2_value1", "attribute1_value1", 1, true), (2L, "tag1_value1", "tag2_value2", "attribute1_value1", 2, false))) .toDF("time", "tag1", "tag2", "attribute1", "s1", "s2") df .write .format("org.apache.iotdb.spark.table.db.IoTDBTableProvider") .option("iotdb.database", "$YOUR_IOTDB_DATABASE_NAME") .option("iotdb.table", "$YOUR_IOTDB_TABLE_NAME") .option("iotdb.username", "$YOUR_IOTDB_USERNAME") .option("iotdb.password", "$YOUR_IOTDB_PASSWORD") .option("iotdb.urls", "$YOUR_IOTDB_URL") .save()
CREATE TEMPORARY VIEW spark_iotdb USING org.apache.iotdb.spark.table.db.IoTDBTableProvider OPTIONS( "iotdb.database"="$YOUR_IOTDB_DATABASE_NAME", "iotdb.table"="$YOUR_IOTDB_TABLE_NAME", "iotdb.username"="$YOUR_IOTDB_USERNAME", "iotdb.password"="$YOUR_IOTDB_PASSWORD", "iotdb.urls"="$YOUR_IOTDB_URL" ); INSERT INTO spark_iotdb VALUES ("VALUE1", "VALUE2", ...); INSERT INTO spark_iotdb SELECT * FROM YOUR_TABLE;
INSERT INTO VALUES: Values must follow IoTDB table schema order (as per DESC TABLE).INSERT INTO SELECT: Columns must exist in the target table. Mismatched column counts trigger IllegalArgumentException.INSERT INTO SELECT with explicit column names allows schema order flexibility.| IoTDB Type | Spark Type |
|---|---|
TsDataType.BOOLEAN | BooleanType |
TsDataType.INT32 | IntegerType |
TsDataType.DATE | DateType |
TsDataType.INT64 | LongType |
TsDataType.TIMESTAMP | LongType |
TsDataType.FLOAT | FloatType |
TsDataType.DOUBLE | DoubleType |
TsDataType.STRING | StringType |
TsDataType.TEXT | StringType |
TsDataType.BLOB | BinaryType |
The mapping primarily converts data into IoTDB Tablet format for writing.
During the Tablet ingestion process into IoTDB, secondary type conversion will be automatically performed if data type mismatches occur.
| Spark Type | IoTDB Type |
|---|---|
BooleanType | TsDataType.BOOLEAN |
ByteType | TsDataType.INT32 |
ShortType | TsDataType.INT32 |
IntegerType | TsDataType.INT32 |
LongType | TsDataType.INT64 |
FloatType | TsDataType.FLOAT |
DoubleType | TsDataType.DOUBLE |
StringType | TsDataType.STRING |
BinaryType | TsDataType.BLOB |
DateType | TsDataType.DATE |
Others | TsDataType.STRING |
INSERT privilege on the target table/database.SELECT privilege on the target table/database.