blob: 17f26732c1addb7d395c171494b0b91cd9197f6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.v2.reader
import org.apache.druid.spark.configuration.DruidConfigurationKeys
import org.apache.druid.spark.v2.DruidDataSourceV2TestUtils
import org.apache.druid.spark.{MAPPER, SparkFunSuite}
import org.apache.druid.timeline.DataSegment
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.{Filter, GreaterThanOrEqual, LessThan}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.scalatest.matchers.should.Matchers
import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
class DruidDataSourceReaderSuite extends SparkFunSuite with Matchers
with DruidDataSourceV2TestUtils {
private val segmentsString = MAPPER.writeValueAsString(
List[DataSegment](firstSegment, secondSegment, thirdSegment).asJava
)
private val defaultExpected = Seq(
Seq(1577836800000L, List("dim1"), 1, 1, 2, 1L, 1L, 3L, 4.2, 1.7F, idOneSketch),
Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch),
Seq(1577851200000L, List("dim1"), 1, 1, 2, 1L, 3L, 1L, 0.2, 0.0F, idOneSketch),
Seq(1577876400000L, List("dim2"), 2, 1, 2, 1L, 1L, 5L, 8.0, 4.15F, idOneSketch),
Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch),
Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch)
).map(wrapSeqToInternalRow(_, schema))
test("DruidDataSourceReader should correctly read directly specified segments") {
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString
)
readSpecifiedSegments(dsoMap, false)
}
test("DruidDataSourceReader should correctly read directly specified segments with a filter") {
val expected = Seq(
Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch),
Seq(1577876400000L, List("dim2"), 2, 1, 2, 1L, 1L, 5L, 8.0, 4.15F, idOneSketch),
Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch),
Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch)
).map(wrapSeqToInternalRow(_, schema))
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString
)
readSpecifiedSegments(dsoMap, false, expected, Option(Array[Filter](GreaterThanOrEqual("sum_metric4", 2L))))
}
test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true, " +
"batch size 1") {
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true",
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.batchSizeKey}" -> "1"
)
readSpecifiedSegments(dsoMap, true)
}
test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true, " +
"batch size 2") {
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true",
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.batchSizeKey}" -> "2"
)
readSpecifiedSegments(dsoMap, true)
}
test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true, " +
"default batch size") {
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true"
)
readSpecifiedSegments(dsoMap, true)
}
test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true " +
"and a filter") {
val expected = Seq(
Seq(1577836800000L, List("dim1"), 1, 1, 2, 1L, 1L, 3L, 4.2, 1.7F, idOneSketch),
Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch),
Seq(1577851200000L, List("dim1"), 1, 1, 2, 1L, 3L, 1L, 0.2, 0.0F, idOneSketch)
).map(wrapSeqToInternalRow(_, schema))
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true"
)
readSpecifiedSegments(dsoMap, true, expected, Option(Array[Filter](LessThan("dim2", 2))))
}
test("DruidDataSourceReader should correctly read directly specified segments with " +
"useSparkConfForDeepStorage = true") {
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.useSparkConfForDeepStorageKey}" -> "true"
)
readSpecifiedSegments(dsoMap, false)
}
test("DruidDataSourceReader should correctly report which filters it does not support") {
val dsoMap = Map(
s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString
)
// DruidDataSourceReader doesn't support pushing down filters on complex types
val filters = Array[Filter](GreaterThanOrEqual("count", 5), LessThan("uniq_id1", 4))
val reader = DruidDataSourceReader(schema, new DataSourceOptions(dsoMap.asJava))
val result = reader.pushFilters(filters)
Array[Filter](LessThan("uniq_id1", 4)) should equal(result)
Array[Filter](GreaterThanOrEqual("count", 5)) should equal(reader.pushedFilters())
}
def readSpecifiedSegments(
optionsMap: Map[String, String],
useVectorizedReads: Boolean,
expected: Seq[InternalRow] = defaultExpected,
filterOpt: Option[Array[Filter]] = None
): Unit = {
val dso = new DataSourceOptions(optionsMap.asJava)
val reader = DruidDataSourceReader(schema, dso)
if (filterOpt.isDefined) {
reader.pushFilters(filterOpt.get)
}
useVectorizedReads should equal(reader.enableBatchRead())
val actual = if (useVectorizedReads) {
reader.planBatchInputPartitions().asScala
.flatMap(r => columnarPartitionReaderToSeq(r.createPartitionReader()))
} else {
reader.planInputPartitions().asScala
.flatMap(r => partitionReaderToSeq(r.createPartitionReader()))
}
actual should equal(expected)
}
}