blob: 018435ae3a8a60be5034d40f6e5aabb027375f80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.tsfile
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.iotdb.hadoop.fileSystem.HDFSInput
import org.apache.iotdb.spark.tsfile.DefaultSource.SerializableConfiguration
import org.apache.iotdb.spark.tsfile.qp.Executor
import org.apache.iotdb.tsfile.common.constant.QueryConstant
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID
import org.apache.iotdb.tsfile.read.common.Field
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet
import org.apache.iotdb.tsfile.read.{TsFileReader, TsFileSequenceReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory
import java.io._
import java.net.URI
import scala.collection.JavaConversions._
private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
override def equals(other: Any): Boolean = other match {
case _: DefaultSource => true
case _ => false
}
override def inferSchema(
spark: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sparkContext.hadoopConfiguration
//check if the path is given
options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(
s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource"))
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
val tsfileSchema = NarrowConverter.getUnionSeries(files, conf)
NarrowConverter.toSqlSchema(tsfileSchema)
}
else {
//get union series in TsFile
val tsfileSchema = WideConverter.getUnionSeries(files, conf)
WideConverter.toSqlSchema(tsfileSchema)
}
}
override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: org.apache.hadoop.fs.Path): Boolean = {
true
}
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
= {
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[DefaultSource])
log.info("This partition starts from " + file.start.asInstanceOf[java.lang.Long]
+ " and ends at " + (file.start + file.length).asInstanceOf[java.lang.Long])
log.info(file.toString())
val conf = broadcastedConf.value.value
val in = new HDFSInput(new Path(new URI(file.filePath)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val readTsFile: TsFileReader = new TsFileReader(reader)
Option(TaskContext.get()).foreach { taskContext => {
taskContext.addTaskCompletionListener { _ => readTsFile.close() }
log.info("task Id: " + taskContext.taskAttemptId() + " partition Id: " +
taskContext.partitionId())
}
}
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
val deviceNames = reader.getAllDevices.map(deviceID => deviceID.asInstanceOf[PlainDeviceID].toStringID)
val measurementNames = new java.util.HashSet[String]()
requiredSchema.foreach((field: StructField) => {
if (field.name != QueryConstant.RESERVED_TIME
&& field.name != NarrowConverter.DEVICE_NAME) {
measurementNames += field.name
}
})
// construct queryExpression based on queriedSchema and filters
val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, deviceNames,
measurementNames, filters, reader, file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions,
file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
var queryDataSet: QueryDataSet = null
var deviceName: String = null
def queryNext(): Boolean = {
if (queryDataSet != null && queryDataSet.hasNext) {
return true
}
if (queryDataSets.isEmpty) {
return false
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
while (!queryDataSet.hasNext) {
if (queryDataSets.isEmpty) {
return false
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
}
deviceName = queryDataSet.getPaths.get(0).getDevice
true
}
new Iterator[InternalRow] {
private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)
private val safeDataRow = new GenericRow(rowBuffer)
// Used to convert `Row`s containing data columns into `InternalRow`s.
private val encoderForDataColumns = RowEncoder(requiredSchema)
override def hasNext: Boolean = {
queryNext()
}
override def next(): InternalRow = {
val curRecord = queryDataSet.next()
val fields = curRecord.getFields
val paths = queryDataSet.getPaths
//index in one required row
var index = 0
requiredSchema.foreach((field: StructField) => {
if (field.name == QueryConstant.RESERVED_TIME) {
rowBuffer(index) = curRecord.getTimestamp
}
else if (field.name == NarrowConverter.DEVICE_NAME) {
rowBuffer(index) = deviceName
}
else {
val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(deviceName,
field.name, true))
var curField: Field = null
if (pos != -1) {
curField = fields.get(pos)
}
rowBuffer(index) = NarrowConverter.toSqlValue(curField)
}
index += 1
})
encoderForDataColumns.toRow(safeDataRow)
}
}
}
else {
// get queriedSchema from requiredSchema
var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData, reader)
// construct queryExpression based on queriedSchema and filters
val queryExpression = WideConverter.toQueryExpression(queriedSchema, filters)
val queryDataSet = readTsFile.query(queryExpression,
file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
new Iterator[InternalRow] {
private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)
private val safeDataRow = new GenericRow(rowBuffer)
// Used to convert `Row`s containing data columns into `InternalRow`s.
private val encoderForDataColumns = RowEncoder(requiredSchema)
override def hasNext: Boolean = {
val hasNext = queryDataSet.hasNext
hasNext
}
override def next(): InternalRow = {
val curRecord = queryDataSet.next()
val fields = curRecord.getFields
val paths = queryDataSet.getPaths
//index in one required row
var index = 0
requiredSchema.foreach((field: StructField) => {
if (field.name == QueryConstant.RESERVED_TIME) {
rowBuffer(index) = curRecord.getTimestamp
} else {
val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(field.name, true))
var curField: Field = null
if (pos != -1) {
curField = fields.get(pos)
}
rowBuffer(index) = WideConverter.toSqlValue(curField)
}
index += 1
})
encoderForDataColumns.toRow(safeDataRow)
}
}
}
}
}
override def shortName(): String = "tsfile"
override def prepareWrite(sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
new TsFileWriterFactory(options)
}
class TSFileDataSourceException(message: String, cause: Throwable)
extends Exception(message, cause) {
def this(message: String) = this(message, null)
}
}
private[tsfile] object DefaultSource {
val path = "path"
val isNarrowForm = "form"
class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = {
out.defaultWriteObject()
value.write(out)
}
private def readObject(in: ObjectInputStream): Unit = {
value = new Configuration(false)
value.readFields(in)
}
}
}