/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.druid.spark.utils | |
import org.apache.druid.data.input.InputRow | |
import org.apache.druid.data.input.impl.{DimensionSchema, DimensionsSpec, DoubleDimensionSchema, | |
FloatDimensionSchema, LongDimensionSchema, StringDimensionSchema} | |
import org.apache.druid.java.util.common.IAE | |
import org.apache.druid.segment.QueryableIndex | |
import org.apache.druid.segment.column.{RowSignature, ValueType} | |
import org.apache.druid.spark.registries.ComplexTypeRegistry | |
import org.apache.spark.sql.catalyst.InternalRow | |
import org.apache.spark.sql.catalyst.util.ArrayData | |
import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, DoubleType, FloatType, | |
IntegerType, LongType, StringType, StructField, StructType, TimestampType} | |
import org.apache.spark.unsafe.types.UTF8String | |
import java.util.{Collection => JCollection} | |
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, | |
iterableAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter} | |
/** | |
* Converters and utilities for working with Spark and Druid schemas. | |
*/ | |
object SchemaUtils { | |
/** | |
* Convert a COLUMNMAP representing a Druid datasource's schema as returned by | |
* DruidMetadataClient.getClient into a Spark StructType. | |
* | |
* @param columnMap The Druid schema to convert into a corresponding Spark StructType. | |
* @return The StructType equivalent of the Druid schema described by COLUMNMAP. | |
*/ | |
def convertDruidSchemaToSparkSchema(columnMap: Map[String, (String, Boolean)]): StructType = { | |
StructType.apply( | |
columnMap.map { case (name, (colType, hasMultipleValues)) => | |
val sparkType = colType match { | |
case "LONG" => LongType | |
case "STRING" => StringType | |
case "DOUBLE" => DoubleType | |
case "FLOAT" => FloatType | |
case "TIMESTAMP" => TimestampType | |
case complexType: String if ComplexTypeRegistry.getRegisteredMetricNames.contains(complexType) => | |
BinaryType | |
// Add other supported types later | |
case _ => throw new IAE(s"Unrecognized type $colType!") | |
} | |
if (hasMultipleValues) { | |
StructField(name, new ArrayType(sparkType, false)) | |
} else { | |
StructField(name, sparkType) | |
} | |
}.toSeq | |
) | |
} | |
/** | |
* Convert a Druid INPUTROW into a Spark InternalRow with schema SCHEMA. | |
* | |
* @param inputRow The Druid InputRow to convert into a Spark Row for loading into a dataframe. | |
* @param schema The schema to map INPUTROW into. | |
* @param useDefaultNullHandling Whether to use the Druid defaults for null values or actual nulls. | |
* @return A Spark InternalRow with schema SCHEMA and values parsed from INPUTROW. | |
*/ | |
def convertInputRowToSparkRow( | |
inputRow: InputRow, | |
schema: StructType, | |
useDefaultNullHandling: Boolean | |
): InternalRow = { | |
InternalRow.fromSeq(schema.fieldNames.map { colName => | |
if (colName == "__time") { | |
inputRow.getTimestampFromEpoch | |
} else { | |
val col = inputRow.getRaw(colName) | |
if (col != null) { | |
schema(colName).dataType match { | |
case _: ArrayType => | |
val baseType = schema(colName).dataType.asInstanceOf[ArrayType].elementType | |
col match { | |
case collection: JCollection[_] => | |
ArrayData.toArrayData(collection.asScala.map { elem => | |
parseToScala(elem, baseType) | |
}) | |
case _ => | |
// Single-element arrays won't be wrapped when read from Druid; need to do it here | |
ArrayData.toArrayData(List(parseToScala(col, baseType))) | |
} | |
case _ => | |
// This is slightly inefficient since some objects will already be the correct type | |
parseToScala(col, schema(colName).dataType) | |
} | |
} else { | |
if (useDefaultNullHandling) { | |
schema(colName).dataType match { | |
case StringType | ArrayType(StringType, _) => UTF8String.EMPTY_UTF8 | |
case LongType | IntegerType | FloatType | DoubleType | TimestampType => 0 | |
case _ => null // scalastyle:ignore null | |
} | |
} else { | |
null // scalastyle:ignore null | |
} | |
} | |
} | |
}) | |
} | |
/** | |
* Convert an object COL to the appropriate scala type for the given Spark DataType DT. | |
* | |
* @param col The object to convert to a suitable type. | |
* @param dt The Spark DataType COL should be made compatible with. | |
* @return COL parsed into a type compatible with DT. | |
*/ | |
def parseToScala(col: Any, dt: DataType): Any = { | |
dt match { | |
case StringType => UTF8String.fromString(col.toString) | |
case LongType => col match { | |
case _: java.lang.Long | Long => col | |
case _: String => col.asInstanceOf[String].toLong | |
case _ => throw new IllegalArgumentException( | |
s"Unsure how to parse ${col.getClass.toString} into a Long!" | |
) | |
} | |
case TimestampType => col // Timestamps should always come back from Druid as DateTimes | |
case FloatType => col match { | |
case _: java.lang.Float | Float => col | |
case _: String => col.asInstanceOf[String].toFloat | |
case _ => throw new IllegalArgumentException( | |
s"Unsure how to parse ${col.getClass.toString} into a Float!" | |
) | |
} | |
case DoubleType => col match { | |
case _: java.lang.Double | Double => col | |
case _: String => col.asInstanceOf[String].toDouble | |
case _ => throw new IllegalArgumentException( | |
s"Unsure how to parse ${col.getClass.toString} into a Double!" | |
) | |
} | |
case BinaryType => | |
if (ComplexTypeRegistry.getRegisteredSerializedClasses.contains(col.getClass)) { | |
ComplexTypeRegistry.deserialize(col) | |
} else { | |
col match { | |
case arr: Array[Byte] => | |
arr | |
case _ => throw new IllegalArgumentException( | |
s"Unsure how to parse ${col.getClass.toString} into a ByteArray!" | |
) | |
} | |
} | |
case _ => throw new IllegalArgumentException( | |
s"$dt currently unsupported!" | |
) | |
} | |
} | |
/** | |
* Given a list of column names DIMENSIONS and a struct type SCHEMA, returns a list of Druid DimensionSchemas | |
* constructed from each column named in DIMENSIONS and the properties of the corresponding field in SCHEMA. | |
* | |
* @param dimensions A list of column names to construct Druid DimensionSchemas for. | |
* @param schema The Spark schema to use to determine the types of the Druid DimensionSchemas created. | |
* @return A list of DimensionSchemas generated from the type information in SCHEMA for each dimension in DIMENSIONS. | |
*/ | |
def convertStructTypeToDruidDimensionSchema( | |
dimensions: Seq[String], | |
schema: StructType | |
): Seq[DimensionSchema] = { | |
schema | |
.filter(field => dimensions.contains(field.name)) | |
.map(field => | |
field.dataType match { | |
case LongType | IntegerType => new LongDimensionSchema(field.name) | |
case FloatType => new FloatDimensionSchema(field.name) | |
case DoubleType => new DoubleDimensionSchema(field.name) | |
case StringType | ArrayType(StringType, _) => | |
new StringDimensionSchema(field.name) | |
case _ => throw new IAE( | |
"Unsure how to create dimension from column [%s] with data type [%s]", | |
field.name, | |
field.dataType | |
) | |
} | |
) | |
} | |
/** | |
* Validates that the given list of DimensionSchemas align with the given Spark schema. This validation is done to | |
* fail fast if the user-provided dimensions have different data types than the data in the source data frame to be | |
* written. | |
* | |
* @param dimensions The list of DimensionSchemas to validate against SCHEMA. | |
* @param schema The source-of-truth Spark schema to ensure compatibility with. | |
* @throws IAE If the data types in DIMENSIONS do not align with the data types in SCHEMA. | |
*/ | |
def validateDimensionSpecAgainstSparkSchema(dimensions: Seq[DimensionSchema], schema: StructType): Boolean = { | |
val incompatibilities = dimensions.flatMap{dim => | |
if (schema.fieldNames.contains(dim.getName)) { | |
val sparkType = schema(schema.fieldIndex(dim.getName)).dataType | |
sparkType match { | |
case LongType | IntegerType => | |
if (dim.getTypeName != DimensionSchema.LONG_TYPE_NAME) { | |
Some(s"${dim.getName}: expected type ${DimensionSchema.LONG_TYPE_NAME} but was ${dim.getTypeName}!") | |
} else { | |
None | |
} | |
case FloatType => | |
if (dim.getTypeName != DimensionSchema.FLOAT_TYPE_NAME) { | |
Some(s"${dim.getName}: expected type ${DimensionSchema.FLOAT_TYPE_NAME} but was ${dim.getTypeName}!") | |
} else { | |
None | |
} | |
case DoubleType => | |
if (dim.getTypeName != DimensionSchema.DOUBLE_TYPE_NAME) { | |
Some(s"${dim.getName}: expected type ${DimensionSchema.DOUBLE_TYPE_NAME} but was ${dim.getTypeName}!") | |
} else { | |
None | |
} | |
case StringType | ArrayType(StringType, _) => | |
if (dim.getTypeName != DimensionSchema.STRING_TYPE_NAME) { | |
Some(s"${dim.getName}: expected type ${DimensionSchema.STRING_TYPE_NAME} but was ${dim.getTypeName}!") | |
} else { | |
None | |
} | |
} | |
} else { | |
None | |
} | |
} | |
if (incompatibilities.nonEmpty) { | |
throw new IAE(s"Incompatible dimensions spec provided! Offending columns: ${incompatibilities.mkString("; ")}") | |
} | |
// For now, the return type could just be Unit, but leaving the stubs in place for future improvement | |
true | |
} | |
/** | |
* Given a Spark schema, construct an equivalent Druid RowSignature. | |
* | |
* @param schema The Spark schema to generate a RowSignature from. | |
* @return A RowSignature corresponding to SCHEMA. | |
*/ | |
def generateRowSignatureFromSparkSchema(schema: StructType): RowSignature = { | |
val builder = RowSignature.builder() | |
schema.foreach{field => | |
builder.add(field.name, getDruidValueTypeForDataType(field.dataType)) | |
} | |
builder.build() | |
} | |
/** | |
* Return the Druid ValueType corresponding to a provided Spark DataType. | |
* | |
* @param dtype The Spark DataType | |
* @return The ValueType corresponding to DTYPE. | |
*/ | |
def getDruidValueTypeForDataType(dtype: DataType): ValueType = { | |
dtype match { | |
case DoubleType => ValueType.DOUBLE | |
case FloatType => ValueType.FLOAT | |
case LongType => ValueType.LONG | |
case StringType => ValueType.STRING | |
case ArrayType(DoubleType, _) => ValueType.DOUBLE_ARRAY | |
case ArrayType(LongType, _) => ValueType.LONG_ARRAY | |
case ArrayType(StringType, _) => ValueType.STRING_ARRAY | |
case _ => ValueType.COMPLEX | |
} | |
} | |
def getDimensionsSpecFromIndex(queryableIndex: QueryableIndex): DimensionsSpec = { | |
val dimensionHandlers = queryableIndex.getDimensionHandlers.asScala | |
new DimensionsSpec(queryableIndex.getAvailableDimensions.asScala.toSeq.map{ | |
dim => | |
val columnHolder = queryableIndex.getColumnHolder(dim) | |
val handler = dimensionHandlers(dim) | |
columnHolder.getCapabilities.getType match { | |
case ValueType.DOUBLE => new DoubleDimensionSchema(dim) | |
case ValueType.FLOAT => new FloatDimensionSchema(dim) | |
case ValueType.LONG => new LongDimensionSchema(dim) | |
case ValueType.STRING => new StringDimensionSchema( | |
dim, handler.getMultivalueHandling, columnHolder.getCapabilities.hasBitmapIndexes | |
) | |
case ValueType.DOUBLE_ARRAY | ValueType.LONG_ARRAY | ValueType.STRING_ARRAY | ValueType.COMPLEX => | |
throw new IAE( | |
s"This reader cannot process dimension [$dim] with type [${columnHolder.getCapabilities.getType}]!" | |
) | |
} | |
}.toList.asJava) | |
} | |
} |