blob: 86ccc811fde46d17e7127397944f0bd9b70283c6 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## Spark-IoTDB
### 版本
Spark和Java所需的版本如下:
| Spark Version | Scala Version | Java Version | TsFile |
| ------------- | ------------- | ------------ | -------- |
| `2.4.0-3.2.0` | `2.12` | `1.8` | `0.13.0` |
### 注意
1. Spark IoTDB Connector只支持`Spark 2.4.0`到`Spark 3.2.0`的`Scala 2.12`版本。
如果需要对其他版本进行支持,可以通过修改源码中`spark-iotdb-connector`这个模块里面pom文件的Scala版本之后进行重新编译。
2. 因为IoTDB与Spark的thrift版本有冲突,所以需要通过执行`rm -f $SPARK_HOME/jars/libthrift*`和`cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/`这两个命令来解决。
否则的话,就只能在IDE里面进行代码调试。而且如果你需要通过`spark-submit`命令提交任务的话,你打包时必须要带上依赖。
### 安装
mvn clean scala:compile compile install
#### Maven依赖
```
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>spark-iotdb-connector</artifactId>
<version>0.13.0</version>
</dependency>
```
#### Spark-shell用户指南
```shell
spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar
import org.apache.iotdb.spark.db._
val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load
df.printSchema()
df.show()
```
如果要对rdd进行分区,可以执行以下操作
```shell
spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar
import org.apache.iotdb.spark.db._
val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
option("numPartition", [the partition number you want]).load
df.printSchema()
df.show()
```
#### 模式推断
以下TsFile结构为例:TsFile模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:
|名称|类型|编码|
|--- |--- |--- |
|状态|Boolean|PLAIN|
|温度|Float|RLE|
|硬件|Text|PLAIN|
TsFile中的现有数据如下:
* d1:root.ln.wf01.wt01
* d2:root.ln.wf02.wt02
| time | d1.status | time | d1.temperature | time | d2.hardware | time | d2.status |
| ---- | --------- | ---- | -------------- | ---- | ----------- | ---- | --------- |
| 1 | True | 1 | 2.2 | 2 | "aaa" | 1 | True |
| 3 | True | 2 | 2.2 | 4 | "bbb" | 2 | False |
| 5 | False | 3 | 2.1 | 6 | "ccc" | 4 | True |
宽(默认)表形式如下:
| time | root.ln.wf02.wt02.temperature | root.ln.wf02.wt02.status | root.ln.wf02.wt02.hardware | root.ln.wf01.wt01.temperature | root.ln.wf01.wt01.status | root.ln.wf01.wt01.hardware |
| ---- | ----------------------------- | ------------------------ | -------------------------- | ----------------------------- | ------------------------ | -------------------------- |
| 1 | null | true | null | 2.2 | true | null |
| 2 | null | false | aaa | 2.2 | null | null |
| 3 | null | null | null | 2.1 | true | null |
| 4 | null | true | bbb | null | null | null |
| 5 | null | null | null | null | false | null |
| 6 | null | null | ccc | null | null | null |
你还可以使用窄表形式,如下所示:(您可以参阅第4部分,了解如何使用窄表形式)
| 时间 | 设备名 | 状态 | 硬件 | 温度 |
| ---- | ----------------- | ----- | ---- | ---- |
| 1 | root.ln.wf02.wt01 | true | null | 2.2 |
| 1 | root.ln.wf02.wt02 | true | null | null |
| 2 | root.ln.wf02.wt01 | null | null | 2.2 |
| 2 | root.ln.wf02.wt02 | false | aaa | null |
| 3 | root.ln.wf02.wt01 | true | null | 2.1 |
| 4 | root.ln.wf02.wt02 | true | bbb | null |
| 5 | root.ln.wf02.wt01 | false | null | null |
| 6 | root.ln.wf02.wt02 | null | ccc | null |
#### 在宽和窄表之间转换
* 从宽到窄
```scala
import org.apache.iotdb.spark.db._
val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
val narrow_df = Transformer.toNarrowForm(spark, wide_df)
```
* 从窄到宽
```scala
import org.apache.iotdb.spark.db._
val wide_df = Transformer.toWideForm(spark, narrow_df)
```
#### Java用户指南
```java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.iotdb.spark.db.*;
public class Example {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Build a DataFrame from Scratch")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
.option("url","jdbc:iotdb://127.0.0.1:6667/")
.option("sql","select * from root").load();
df.printSchema();
df.show();
Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df);
narrowTable.show();
}
}
```
### 写数据到IoTDB
#### 用户指南
``` scala
// import narrow table
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "device_name")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
.withColumnRenamed("_6", "s3")
.withColumnRenamed("_7", "s4")
.withColumnRenamed("_8", "s5")
dfWithColumn
.write
.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
// import wide table
val df = spark.createDataFrame(List(
(1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "root.test.d0.s0")
.withColumnRenamed("_3", "root.test.d0.s1")
.withColumnRenamed("_4", "root.test.d0.s2")
.withColumnRenamed("_5", "root.test.d0.s3")
.withColumnRenamed("_6", "root.test.d0.s4")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("numPartition", "10")
.save
```
#### 注意
1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
2. numPartition参数是用来设置分区数,会在写入数据之前给dataframe进行重分区。每一个分区都会开启一个session进行数据的写入,来提高并发数。