blob: 89e926e5a211dca60f74aec1352fdb6f49b8a7b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.v2.reader
import org.apache.druid.data.input.impl.{DimensionsSpec, TimestampSpec}
import org.apache.druid.data.input.{ColumnsFilter, InputEntityReader, InputRowSchema}
import org.apache.druid.indexing.input.{DruidSegmentInputEntity, DruidSegmentInputFormat}
import org.apache.druid.java.util.common.FileUtils
import org.apache.druid.query.filter.DimFilter
import org.apache.druid.segment.loading.SegmentLoader
import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys, SerializableHadoopConfiguration}
import org.apache.druid.spark.mixins.Logging
import org.apache.druid.spark.utils.SchemaUtils
import org.apache.druid.spark.v2.INDEX_IO
import org.apache.druid.timeline.DataSegment
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.StructType
import java.io.File
import scala.collection.JavaConverters.setAsJavaSetConverter
class DruidInputPartitionReader(
segmentStr: String,
schema: StructType,
filter: Option[DimFilter],
columnTypes: Option[Set[String]],
broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],
conf: Configuration,
useSparkConfForDeepStorage: Boolean,
useCompactSketches: Boolean,
useDefaultNullHandling: Boolean
)
extends DruidBaseInputPartitionReader(
segmentStr,
columnTypes,
broadcastedHadoopConf,
conf,
useSparkConfForDeepStorage,
useCompactSketches,
useDefaultNullHandling
) with InputPartitionReader[InternalRow] with Logging {
private val availableColumns =
segment.asQueryableIndex().getColumnNames + conf.get(DruidConfigurationKeys.timestampColumnDefaultReaderKey)
private val inputEntityReaderRows = DruidInputPartitionReader.makeInputFormat(
dataSegment,
segmentLoader,
tmpDir,
filter.orNull,
conf.get(DruidConfigurationKeys.timestampColumnDefaultReaderKey),
conf.get(DruidConfigurationKeys.timestampFormatDefaultReaderKey),
SchemaUtils.getDimensionsSpecFromIndex(segment.asQueryableIndex()),
schema.fieldNames.toList.filter(availableColumns.contains(_))
).read()
override def next(): Boolean = {
inputEntityReaderRows.hasNext
}
override def get(): InternalRow = {
SchemaUtils.convertInputRowToSparkRow(inputEntityReaderRows.next(), schema, useDefaultNullHandling)
}
override def close(): Unit = {
try {
if (Option(segment).nonEmpty) {
segment.close()
}
if (Option(tmpDir).nonEmpty) {
FileUtils.deleteDirectory(tmpDir)
}
} catch {
case e: Exception =>
// Since we're just going to rethrow e and tearing down the JVM will clean up the segment even if we can't, the
// only leak we have to worry about is the temp file. Spark should clean up temp files as well, but rather than
// rely on that we'll try to take care of it ourselves.
logWarn("Encountered exception attempting to close a DruidInputPartitionReader!")
if (Option(tmpDir).nonEmpty && tmpDir.exists()) {
FileUtils.deleteDirectory(tmpDir)
}
throw e
}
}
}
private[v2] object DruidInputPartitionReader {
private def makeInputFormat(
segment: DataSegment,
segmentLoader: SegmentLoader,
loadDir: File,
filter: DimFilter,
timestampColumnName: String,
timestampColumnFormat: String,
dimensionsSpec: DimensionsSpec,
columns: Seq[String]
): InputEntityReader = {
val inputFormat = new DruidSegmentInputFormat(INDEX_IO, filter)
val timestampSpec = new TimestampSpec(timestampColumnName, timestampColumnFormat, null) // scalastyle:ignore null
val inputSchema = new InputRowSchema(
timestampSpec,
dimensionsSpec,
ColumnsFilter.inclusionBased(columns.toSet.asJava)
)
val inputSource = new DruidSegmentInputEntity(
segmentLoader,
segment,
segment.getInterval
)
inputFormat.createReader(
inputSchema,
inputSource,
loadDir
)
}
}