blob: 66817f760ec01aac06a06adf72439f1720de4aea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.registries
import org.apache.datasketches.hll.HllSketch
import org.apache.datasketches.quantiles.DoublesSketch
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule
import org.apache.druid.query.aggregation.datasketches.theta.{SketchHolder, SketchModule}
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule
import org.apache.druid.query.aggregation.histogram.{ApproximateHistogram,
ApproximateHistogramDruidModule, FixedBucketsHistogram, FixedBucketsHistogramAggregator}
import org.apache.druid.query.aggregation.variance.{VarianceAggregatorCollector, VarianceSerde}
import org.apache.druid.segment.serde.ComplexMetrics
import org.apache.druid.spark.mixins.Logging
import scala.collection.mutable
/**
* A registry for plugging in support for Druid complex types. Provides definitions for supporting complex types in
* extensions-core out of the box.
*/
object ComplexTypeRegistry extends Logging {
private val registeredSerdeInitFunctions: mutable.HashMap[String, () => Unit] = new mutable.HashMap()
private val registeredSerializeFunctions: mutable.HashMap[Class[_], Any => Array[Byte]] =
new mutable.HashMap()
/**
* Register a function REGISTERSERDEFUNC that initializes serializers and deserializers for the complex type with
* the name NAME. Assumes that the associated complex type is serialized as a byte array.
*
* @param name The type name of the complex type to register.
* @param registerSerdeFunc The function to use to register the necessary serdes for this complex type.
*/
def register(
name: String,
registerSerdeFunc: () => Unit
): Unit = {
logInfo(s"Registering serde initializers for complex type $name.")
registeredSerdeInitFunctions(name) = registerSerdeFunc
}
/**
* Register a function REGISTERSERDEFUNC that initializes serializers and deserializers, a class DESERIALIZEDCLASS
* to deserialize byte arrays into, and a function SERIALIZEFUNC for converting from instances of DESERIALIZEDCLASS
* to byte arrays for the complex type with the name NAME. Assumes that the associated complex type is
* serialized as a byte array.
*
* @param name The type name of the complex type to register.
* @param registerSerdeFunc The function to use to register the necessary serdes for this complex type.
* @param deserializedClass The class to deserialize values of the registered type to from byte arrays.
* @param serializeFunc The function to use when serializing instances of DESERIALIZEDCLASS to byte arrays.
*/
def register(
name: String,
registerSerdeFunc: () => Unit,
deserializedClass: Class[_],
serializeFunc: Any => Array[Byte]): Unit = {
logInfo(s"Registering serde initializers and serialization functions for complex type $name.")
registeredSerdeInitFunctions(name) = registerSerdeFunc
registeredSerializeFunctions(deserializedClass) = serializeFunc
}
/**
* Shortcut for registering known complex type serdes (e.g. those in extensions-core) by name.
*
* @param name The type name of the complex type to register.
* @param shouldCompact Whether or not to store compacted versions of this complex type. Ignored for complex types
* that don't have compacted forms.
*/
def registerByName(name: String, shouldCompact: Boolean = false): Unit = {
if (!registeredSerdeInitFunctions.contains(name) && knownTypes.contains(name)) {
knownTypes(name)(shouldCompact)
}
}
def getRegisteredMetricNames: Set[String] = {
registeredSerdeInitFunctions.keySet.toSet
}
def getRegisteredSerializedClasses: Set[Class[_]] = {
registeredSerializeFunctions.keySet.toSet
}
def registerSerdeInitFunctions(complexTypeName: String): Unit = {
if (registeredSerdeInitFunctions.contains(complexTypeName)) {
registeredSerdeInitFunctions(complexTypeName).apply()
}
}
def deserialize(col: Any): Array[Byte] = {
if (registeredSerializeFunctions.keySet.contains(col.getClass)) {
registeredSerializeFunctions(col.getClass)(col)
} else {
throw new IllegalArgumentException(
s"Unsure how to parse ${col.getClass.toString} into a ByteArray!"
)
}
}
def registerSerdes(): Unit = {
registeredSerdeInitFunctions.foreach(_._2.apply())
}
/**
* Register serdes for all complex types in extensions-core.
*/
def initializeDefaults(shouldCompact: Boolean = false): Unit = {
knownTypes.foreach(_._2.apply(shouldCompact))
}
private val knownTypes: Map[String, Boolean => Unit] = Map[String, Boolean => Unit](
// Approximate Histograms
"approximateHistogram" -> ((_: Boolean) =>
register(
"approximateHistogram",
() => ApproximateHistogramDruidModule.registerSerde(),
classOf[ApproximateHistogram],
histogram => histogram.asInstanceOf[ApproximateHistogram].toBytes
)),
// Fixed Bucket Histograms
FixedBucketsHistogramAggregator.TYPE_NAME -> ((_: Boolean) =>
register(
FixedBucketsHistogramAggregator.TYPE_NAME,
() => ApproximateHistogramDruidModule.registerSerde(),
classOf[FixedBucketsHistogram],
histogram => histogram.asInstanceOf[FixedBucketsHistogram].toBytes
)),
// Tuple Sketches
ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH -> ((_: Boolean) =>
register(
ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH,
// TODO: This probably needs to be wrapped in a try to ensure it only happens once
() => new ArrayOfDoublesSketchModule().configure(null), // scalastyle:ignore null
classOf[ArrayOfDoublesSketch],
sketch => sketch.asInstanceOf[ArrayOfDoublesSketch].toByteArray
)),
// Quantiles Sketches
DoublesSketchModule.DOUBLES_SKETCH -> ((shouldCompact: Boolean) =>
register(
DoublesSketchModule.DOUBLES_SKETCH,
() => DoublesSketchModule.registerSerde(),
classOf[DoublesSketch],
sketch => {
val doublesSketch = sketch.asInstanceOf[DoublesSketch]
if (shouldCompact) doublesSketch.toByteArray(shouldCompact) else doublesSketch.toByteArray
}
)),
// HLL Sketches
HllSketchModule.TYPE_NAME -> ((shouldCompact: Boolean) =>
register(
HllSketchModule.TYPE_NAME,
() => HllSketchModule.registerSerde(),
classOf[HllSketch],
sketch => {
val hllSketch = sketch.asInstanceOf[HllSketch]
if (shouldCompact) hllSketch.toCompactByteArray else hllSketch.toUpdatableByteArray
}
)),
// Theta Sketches
SketchModule.THETA_SKETCH -> ((shouldCompact: Boolean) =>
register(
SketchModule.THETA_SKETCH,
() => SketchModule.registerSerde(),
classOf[SketchHolder],
sketch => {
val thetaSketch = sketch.asInstanceOf[SketchHolder].getSketch
if (shouldCompact) thetaSketch.compact().toByteArray else thetaSketch.toByteArray
}
)),
SketchModule.THETA_SKETCH_BUILD_AGG -> ((shouldCompact: Boolean) =>
register(
SketchModule.THETA_SKETCH_BUILD_AGG,
() => SketchModule.registerSerde(),
classOf[SketchHolder], // TODO: Maybe?
sketch => {
val thetaSketch = sketch.asInstanceOf[SketchHolder].getSketch
if (shouldCompact) thetaSketch.compact().toByteArray else thetaSketch.toByteArray
}
)),
SketchModule.THETA_SKETCH_MERGE_AGG -> ((shouldCompact: Boolean) =>
register(
SketchModule.THETA_SKETCH_MERGE_AGG,
() => SketchModule.registerSerde(),
classOf[SketchHolder], // TODO: Maybe?
sketch => {
val thetaSketch = sketch.asInstanceOf[SketchHolder].getSketch
if (shouldCompact) thetaSketch.compact().toByteArray else thetaSketch.toByteArray
}
)),
// Variance
"variance" -> ((_: Boolean) =>
register(
"variance",
() => ComplexMetrics.registerSerde("variance", new VarianceSerde()),
classOf[VarianceAggregatorCollector],
collector => collector.asInstanceOf[VarianceAggregatorCollector].toByteArray
))
)
}