blob: 264720a631d220740877b6b0a52ac20f3975065a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.db
import org.apache.iotdb.session.Session
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.{BOOLEAN, DOUBLE, FLOAT, INT32, INT64, TEXT}
import org.apache.spark.sql.{DataFrame, Row}
import java.util
import java.lang
object DataFrameTools {
def insertDataFrame(options: IoTDBOptions, dataframe: DataFrame): Unit = {
val filteredColumns = Array[String]("Time", "Device")
val sensorTypes = dataframe.dtypes.filter(x => !filteredColumns.contains(x._1))
dataframe
.repartition(options.numPartition.toInt)
.sortWithinPartitions(dataframe.col("Device"))
.foreachPartition { (partition: Iterator[Row]) =>
val hostPort = options.url.split("//")(1).replace("/", "").split(":")
val session = new Session(
hostPort(0),
hostPort(1).toInt,
options.user,
options.password
)
session.open()
var device: lang.String = ""
val times = new util.ArrayList[lang.Long]()
val measurementsList = new util.ArrayList[util.List[lang.String]]()
val typesList = new util.ArrayList[util.List[TSDataType]]()
val valuesList = new util.ArrayList[util.List[Object]]()
val batchSize = 1000
var currentSize = 0
partition.foreach { record =>
if ("".equals(device)) device = record.get(1).toString
else if (!device.equals(record.get(1).toString)) {
insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
device = record.get(1).toString
currentSize = 0
}
val measurements = new util.ArrayList[lang.String]()
val types = new util.ArrayList[TSDataType]()
val values = new util.ArrayList[Object]()
for (i <- 2 until record.length if !(record.get(i) == null)) {
val value = typeTrans(record.get(i).toString, getType(sensorTypes(i - 2)._2))
values.add(value)
measurements.add(sensorTypes(i - 2)._1)
types.add(getType(sensorTypes(i - 2)._2))
}
if (!values.isEmpty) {
times.add(record.get(0).asInstanceOf[Long])
measurementsList.add(measurements)
typesList.add(types)
valuesList.add(values)
currentSize += 1
}
if (currentSize >= batchSize) {
insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
currentSize = 0
}
}
insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
session.close()
}
}
def typeTrans(value: lang.String, dataType: TSDataType): Object = {
dataType match {
case TSDataType.TEXT => value
case TSDataType.BOOLEAN => lang.Boolean.valueOf(value)
case TSDataType.INT32 => lang.Integer.valueOf(value)
case TSDataType.INT64 => lang.Long.valueOf(value)
case TSDataType.FLOAT => lang.Float.valueOf(value)
case TSDataType.DOUBLE => lang.Double.valueOf(value)
case _ => null
}
}
def getType(typeStr: lang.String): TSDataType = {
typeStr match {
case "StringType" => TEXT
case "BooleanType" => BOOLEAN
case "IntegerType" => INT32
case "LongType" => INT64
case "FloatType" => FLOAT
case "DoubleType" => DOUBLE
case _ => null
}
}
def insertAndEmptyDataSet(session: Session,
device: lang.String,
times: util.ArrayList[lang.Long],
measurementsList: util.ArrayList[util.List[lang.String]],
typesList: util.ArrayList[util.List[TSDataType]],
valuesList: util.ArrayList[util.List[Object]]): Unit = {
if (!valuesList.isEmpty) {
session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList)
times.clear()
measurementsList.clear()
typesList.clear()
valuesList.clear()
}
}
}