Add Spark connector reader support. (#11823)

diff --git a/.travis.yml b/.travis.yml
index 4f8e68a..2fbf62a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -17,6 +17,7 @@
   only:
   - master
   - /^\d+\.\d+\.\d+(-\S*)?$/ # release branches
+  - spark_druid_connector
 
 language: java
 
@@ -94,7 +95,7 @@
         - sudo apt-get update && sudo apt-get install python3 python3-pip python3-setuptools -y
         - ./check_test_suite.py && travis_terminate 0 || echo 'Continuing setup'
         - pip3 install wheel  # install wheel first explicitly
-        - pip3 install pyyaml
+        - pip3 install pyyaml==5.4.1
       script:
         - >
           ${MVN} apache-rat:check -Prat --fail-at-end
@@ -323,7 +324,12 @@
       <<: *test_processing_module
       name: "(openjdk8) other modules test"
       env:
-        - MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console,!integration-tests'
+        - MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console,!integration-tests,!spark'
+      after_failure:
+        - ls -alh
+        - ls -alh /home/travis/build/apache/druid/extensions-core/kafka-extraction-namespace/target/
+        - ls -alh /home/travis/build/apache/druid/extensions-core/kafka-extraction-namespace/target/surefire-reports/
+        - for d in /home/travis/build/apache/druid/extensions-core/kafka-extraction-namespace/target/surefire-reports/*.txt; do echo $d; cat $d; done
 
     - <<: *test_other_modules
       name: "(openjdk11) other modules test"
@@ -350,6 +356,24 @@
       stage: Tests - phase 2
       jdk: openjdk15
 
+    # There's no need for separate sql compatibility tests since null handling isn't controlled by a command-line flag
+    # Instead, the spark module unit tests test both variations of null handling themselves.
+    - &test_spark_module
+      <<: *test_processing_module # Using the processing module settings for now
+      name: "(openjdk8) spark module test"
+      env:
+        - MAVEN_PROJECTS='spark'
+
+    - <<: *test_spark_module
+      name: "(openjdk11) spark module test"
+      stage: Tests - phase 2
+      jdk: openjdk11
+
+    - <<: *test_spark_module
+      name: "(openjdk15) spark module test"
+      stage: Tests - phase 2
+      jdk: openjdk15
+
     - name: "web console"
       install: skip
       stage: Tests - phase 1
diff --git a/distribution/docker/Dockerfile b/distribution/docker/Dockerfile
index 7e7cdcb..69867d1 100644
--- a/distribution/docker/Dockerfile
+++ b/distribution/docker/Dockerfile
@@ -18,7 +18,7 @@
 #
 
 ARG JDK_VERSION=8
-FROM maven:3-jdk-11-slim as builder
+FROM maven:3.8.1-jdk-11-slim as builder
 # Rebuild from source in this stage
 # This can be unset if the tarball was already built outside of Docker
 ARG BUILD_FROM_SOURCE="true"
diff --git a/docs/operations/spark.md b/docs/operations/spark.md
new file mode 100644
index 0000000..1b3fa8c
--- /dev/null
+++ b/docs/operations/spark.md
@@ -0,0 +1,290 @@
+---

+id: spark

+title: "Apache Spark Reader and Writer"

+---

+

+<!--

+  ~ Licensed to the Apache Software Foundation (ASF) under one

+  ~ or more contributor license agreements.  See the NOTICE file

+  ~ distributed with this work for additional information

+  ~ regarding copyright ownership.  The ASF licenses this file

+  ~ to you under the Apache License, Version 2.0 (the

+  ~ "License"); you may not use this file except in compliance

+  ~ with the License.  You may obtain a copy of the License at

+  ~

+  ~   http://www.apache.org/licenses/LICENSE-2.0

+  ~

+  ~ Unless required by applicable law or agreed to in writing,

+  ~ software distributed under the License is distributed on an

+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+  ~ KIND, either express or implied.  See the License for the

+  ~ specific language governing permissions and limitations

+  ~ under the License.

+  -->

+

+# Apache Spark Reader and Writer for Druid

+

+## Reader

+The reader reads Druid segments from deep storage into Spark. It locates the segments to read and determines their

+schema if not provided by querying the brokers for the relevant metadata but otherwise does not interact with a running

+Druid cluster.

+

+Sample Code:

+```scala

+import org.apache.druid.spark.DruidDataFrameReader

+

+val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")

+

+sparkSession

+  .read

+  .brokerHost("localhost")

+  .brokerPort(8082)

+  .metadataDbType("mysql")

+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")

+  .metadataUser("druid")

+  .metadataPassword("diurd")

+  .dataSource("dataSource")

+  .deepStorage(deepStorageConfig)

+  .druid()

+```

+

+Alternatively, the reader can be configured via a properties map with no additional import needed:

+```scala

+val properties = Map[String, String](

+  "metadata.dbType" -> "mysql",

+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",

+  "metadata.user" -> "druid",

+  "metadata.password" -> "diurd",

+  "broker.host" -> "localhost",

+  "broker.port" -> 8082,

+  "table" -> "dataSource",

+  "reader.deepStorageType" -> "local",

+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"

+)

+

+sparkSession

+  .read

+  .format("druid")

+  .options(properties)

+  .load()

+```

+

+If you know the schema of the Druid data source you're reading from, you can save needing to determine the schema via

+calls to the broker with

+```scala

+sparkSession

+  .read

+  .format("druid")

+  .schema(schema)

+  .options(properties)

+  .load()

+```

+

+Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)

+are triggered, to allow predicates to be pushed down to the reader and avoid full scans of the underlying Druid data.

+

+## Plugin Registries and Druid Extension Support

+One of Druid's strengths is its extensibility. Since these Spark readers and writers will not execute on a Druid cluster

+and won't have the ability to dynamically load classes or integrate with Druid's Guice injectors, Druid extensions can't

+be used directly. Instead, these connectors use a plugin registry architecture, including default plugins that support

+most functionality in `extensions-core`. Custom plugins consisting of a string name and one or more serializable

+generator functions must be registered before the first Spark action which would depend on them is called.

+

+### ComplexTypeRegistry

+The `ComplexTypeRegistry` provides support for serializing and deserializing complex types between Spark and Druid.

+Support for complex types in Druid core extensions is provided out of the box.

+

+Users wishing to override the default behavior or who need to add support for additional complex types can use the

+`ComplexTypeRegistry.register` functions to associate serde functions with a given complex type. The name used to

+register custom behavior must match the complex type name reported by Druid.

+**Custom complex type plugins must be registered with both the executors and the Spark driver.**

+

+### DynamicConfigProviderRegistry

+The `DynamicConfigProviderRegistry` provides support for deserializing dynamic configuration values via Druid

+[DynamicConfigProviders](dynamic-config-provider.md).

+

+Custom providers should be registered on the driver if used to supply passwords for the backing metadata database and on

+the executors if used to supply deep storage credentials or keys.

+

+### SegmentReaderRegistry

+The `SegmentReaderRegistry` provides support for reading segments from deep storage. Local, HDFS, GCS, S3, and Azure

+Storage deep storage implementations are supported by default.

+

+Users wishing to override the default behavior or who need to add support for additional deep storage implementations

+can use either `SegmentReaderRegistry.registerInitializer` (to provide any necessary Jackson configuration for

+deserializing a `LoadSpec` object from a segment load spec) or `SegmentReaderRegistry.registerLoadFunction` (to register

+a function for creating a URI from a segment load spec). These two functions correspond to the first and second approach

+[outlined below](#deep-storage). **Note that custom plugins must be registered on the executors, not the Spark driver.**

+

+### SQLConnectorRegistry

+The `SQLConnectorRegistry` provides support for configuring connectors to Druid metadata databases. Support for MySQL,

+PostgreSQL, and Derby databases are provided out of the box.

+

+Users wishing to override the default behavior or who need to add support for additional metadata database

+implementations can use the `SQLConnectorRegistry.register` function. Custom connectors should be registered on the

+driver.

+

+## Deploying to a Spark cluster

+This extension can be run on a Spark cluster in one of two ways: bundled as part of an application jar or uploaded as

+a library jar to a Spark cluster and included in the classpath provided to Spark applications by the application

+manager. If the second approach is used, this extension should be built with

+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`

+uploaded to the Spark cluster. Application jars should then be built with a compile-time dependency on

+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with `compileOnly` in Gradle).

+

+## Configuration Reference

+

+### Metadata Client Configs

+The properties used to configure the client that interacts with the Druid metadata server directly. Used by both reader

+and the writer. The `metadata.password` property can either be provided as a string that will be used as-is or can be

+provided as a serialized DynamicConfigProvider that will be resolved when the metadata client is first instantiated. If

+a  custom DynamicConfigProvider is used, be sure to register the provider with the DynamicConfigProviderRegistry before use.

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`metadata.dbType`|The metadata server's database type (e.g. `mysql`)|Yes||

+|`metadata.host`|The metadata server's host name|If using derby|`localhost`|

+|`metadata.port`|The metadata server's port|If using derby|1527|

+|`metadata.connectUri`|The URI to use to connect to the metadata server|If not using derby||

+|`metadata.user`|The user to use when connecting to the metadata server|If required by the metadata database||

+|`metadata.password`|The password to use when connecting to the metadata server. This can optionally be a serialized instance of a Druid DynamicConfigProvider or a plain string|If required by the metadata database||

+|`metadata.dbcpProperties`|The connection pooling properties to use when connecting to the metadata server|No||

+|`metadata.baseName`|The base name used when creating Druid metadata tables|No|`druid`|

+

+### Druid Client Configs

+The configuration properties used to query the Druid cluster for segment metadata. Only used in the reader, and only if

+no schema is provided.

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`broker.host`|The hostname of a broker in the Druid cluster to read from|No|`localhost`|

+|`broker.port`|The port of the broker in the Druid cluster to read from|No|8082|

+|`broker.numRetries`|The number of times to retry a timed-out segment metadata request|No|5|

+|`broker.retryWaitSeconds`|How long (in seconds) to wait before retrying a timed-out segment metadata request|No|5|

+|`broker.timeoutMilliseconds`|How long (in milliseconds) to wait before timing out a segment metadata request|No|300000|

+

+### Reader Configs

+The properties used to configure the DataSourceReader when reading data from Druid in Spark.

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`table`|The Druid data source to read from|Yes||

+|`reader.deepStorageType`|The type of deep storage used to back the target Druid cluster|No|`local`|

+|`reader.segments`|A hard-coded list of Druid segments to read. If set, the table and druid metadata client configurations are ignored and the specified segments are read directly. Must be deserializable into Druid DataSegment instances|No|

+|`reader.useCompactSketches`|Controls whether or not compact representations of complex types are used (only for types that support compact forms)|No|False|

+|`reader.useDefaultValueForNull`|If true, use Druid's default values for null values. If false, explicitly use null for null values. See the [Druid configuration reference](../configuration/index.html#sql-compatible-null-handling) for more details|No|True|

+|`reader.useSparkConfForDeepStorage`|If true, use the Spark job's configuration to set up access to deep storage|No|False|

+|`reader.allowIncompletePartitions`|If true, read both complete and incomplete Druid partitions. If false, read only complete partitions.|No|False|

+|`reader.timestampColumn`|The timestamp column name for the data source to read from|No|`__time`|

+|`reader.timestampFormat`|The format of the timestamps in `timestampColumn`|No|`millis`|

+|`reader.vectorize`|**Experimental!** If true, reads data from segments in batches if possible|No|False|

+|`reader.batchSize`|**Experimental!** The number of rows to read in one batch if `reader.vectorize` is set to true|No|512|

+

+#### Deep Storage

+There are two ways to configure the DataSourceReader's access to deep storage.

+

+1. Users can directly configure the necessary keys and properties following the [deep storage configuration options](#deep-storage-configs) below.

+2. Users can delegate to the Spark application config by setting `reader.useSparkConfForDeepStorage` to true.

+

+In the second case, the reader will construct a URI from the load spec of each segment to read and pull the segment from

+that URI using a FileSystem created from the calling Spark application's Hadoop Configuration. This case is useful for

+users running on clusters that rely on GCS ADCs or AWS IAM roles for machine authorization to GCS/S3, or for clusters

+that manage access keys for their users. Currently local, HDFS, S3, and GCS deep storage implementation are supported

+out of the box for this approach (Azure users will need to use the first approach or register a custom load function

+via the `SegmentReaderRegistry`).

+

+If other deep storage implementations are used or custom behavior is required, users can register plugins providing the

+additional functionality with the `SegmentReaderRegistry`.

+

+

+#### Vectorized Reads

+**Experimental!** The DataSourceReader can optionally attempt to read data from segments in batches.

+Spark 2.4 does not take full advantage of the capability, but vectorized reads may speed up data load

+times considerably. The default value for `reader.batchSize` isn't much more than a SWAG, so please

+test your workload with a few different batch sizes to determine the value that best balances speed

+and memory usage for your use case (and then let us know what worked best for you so we can improve

+the documentation!).

+

+### Deep Storage Configs

+The configuration properties used when interacting with deep storage systems directly.

+

+#### Local Deep Storage Config

+`deepStorageType` = `local`

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`local.storageDirectory`|The location to write segments out to|Yes|`/tmp/druid/localStorage`|

+

+#### HDFS Deep Storage Config

+`deepStorageType` = `hdfs`

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`hdfs.storageDirectory`|The location to write segments out to|Yes||

+|`hdfs.hadoopConf`|A Base64 encoded representation of dumping a Hadoop Configuration to a byte array via `.write`|Yes||

+

+#### S3 Deep Storage Config

+`deepStorageType` = `s3`

+

+These configs generally shadow the [Connecting to S3 configuration](../development/extensions-core/s3.md#connecting-to-s3-configuration)

+section of the Druid s3 extension doc, including in the inconsistent use of `disable` vs `enable` as boolean property

+name prefixes

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`s3.bucket`|The S3 bucket to write segments to|Yes||

+|`s3.baseKey`|The base key to prefix segments with when writing to S3|Yes||

+|`s3.maxListingLength`|The maximum number of input files matching a prefix to retrieve or delete in one call|No|1000/1024|

+|`s3.disableAcl`|Whether or not to disable ACLs on the output segments. If this is false, additional S3 permissions are required|No|False|

+|`s3.useS3aSchema`|Whether or not to use the `s3a` filesystem when writing segments to S3.|No|True ***(note this is the opposite of the druid-s3 extension!)***|

+|`s3.accessKey`|The S3 access key. See [S3 authentication methods](../development/extensions-core/s3.md#s3-authentication-methods) for more details||

+|`s3.secretKey`|The S3 secret key. See [S3 authentication methods](../development/extensions-core/s3.md#s3-authentication-methods) for more details||

+|`s3.fileSessionCredentials`|The path to a properties file containing S3 session credentials. See [S3 authentication methods](../development/extensions-core/s3.md#s3-authentication-methods) for more details||

+|`s3.proxy.host`|The proxy host to connect to S3 through|No||

+|`s3.proxy.port`|The proxy port to connect to S3 through|No||

+|`s3.proxy.username`|The user name to use when connecting through a proxy|No||

+|`s3.proxy.password`|The password to use when connecting through a proxy. Plain string|No||

+|`s3.endpoint.url`|The S3 service endpoint to connect to||

+|`s3.endpoint.signingRegion`|The region to use for signing requests (e.g. `us-west-1`)|||

+|`s3.protocol`|The communication protocol to use when communicating with AWS. This configuration is ignored if `s3EndpointConfigUrl` includes the protocol| |`https`|

+|`s3.disableChunkedEncoding`|Whether or not to disable chunked encoding| |False| <!-- Keeping the irritating inconsistency in property naming here to match the S3 extension names -->

+|`s3.enablePathStyleAccess`|Whether or not to enable path-style access| |False|

+|`s3.forceGlobalBucketAccessEnabled`|Whether or not to force global bucket access| |False|

+|`s3.sse.type`|The type of Server Side Encryption to use (`s3`, `kms`, or `custom`). If not set, server side encryption will not be used||

+|`s3.sse.kms.keyId`|The keyId to use for `kms` server side encryption|Only if `s3ServerSideEncryptionTypeKey` is `kms`|

+|`s3.sse.custom.base64EncodedKey`|The base-64 encoded key to use for `custom` server side encryption|Only if `s3ServerSideEncryptionTypeKey` is `custom`|

+

+#### GCS Deep Storage Config

+`deepStorageType` = `google`

+

+These configs shadow the [Google Cloud Storage Extension](../development/extensions-core/google.md) docs. The environment variable

+`GOOGLE_APPLICATION_CREDENTIALS` must be set to write segments to GCS.

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`google.bucket`|The GCS bucket to write segments to|Yes||

+|`google.prefix`|The base key to prefix segments with when writing to GCS|Yes||

+|`google.maxListingLength`|The maximum number of input files matching a prefix to retrieve or delete in one call|No|1024|

+

+#### Azure Deep Storage Config

+`deepStorageType` = `azure`

+

+Writing data to Azure deep storage is currently experimental. It should work but is untested. If you use this connector

+to write segments to Azure, please update this documentation.

+

+|Key|Description|Required|Default|

+|---|-----------|--------|-------|

+|`azure.account`|The Azure Storage account name to use|Yes||

+|`azure.key`|The key for the Azure Storage account used|Yes||

+|`azure.container`|The Azure Storage container name|Yes||

+|`azure.maxTries`|The number of tries before failing an Azure operation|No|3|

+|`azure.protocol`|The communication protocol to use when interacting with Azure Storage|No|`https`|

+|`azure.prefix`|The string to prepend to all segment blob names written to Azure Storage|No|`""`|

+|`azure.maxListingLength`|The maximum number of input files matching a prefix to retrieve or delete in one call|No|1024|

+

+#### Custom Deep Storage Implementations

+

+The Spark-Druid extension includes support for writing segments to the above deep storage options. If you want to write

+to a different deep storage option or use a different implementation for one of the options above, you can implement and

+register a plugin with the SegmentWriterRegistry. Any properties specified in the options map specified in the `write()`

+call will be passed along to the plugin functions.
\ No newline at end of file
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index b5643ce..10720d6 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -77,7 +77,6 @@
       NamespaceExtractionCacheManager.class);
   private final CacheHandler cacheHandler = PowerMock.createStrictMock(CacheHandler.class);
 
-
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
index 7550863..7b55cc6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
@@ -41,14 +41,14 @@
   private final DataSegment segment;
   private final Interval intervalFilter;
 
-  DruidSegmentInputEntity(SegmentLoader segmentLoader, DataSegment segment, Interval intervalFilter)
+  public DruidSegmentInputEntity(SegmentLoader segmentLoader, DataSegment segment, Interval intervalFilter)
   {
     this.segmentLoader = segmentLoader;
     this.segment = segment;
     this.intervalFilter = intervalFilter;
   }
 
-  Interval getIntervalFilter()
+  public Interval getIntervalFilter()
   {
     return intervalFilter;
   }
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index 845bf21..b769fa7 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-ARG JDK_VERSION=8-slim
+ARG JDK_VERSION=8-slim-buster
 FROM openjdk:$JDK_VERSION as druidbase
 
 # Bundle everything into one script so cleanup can reduce image size.
diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh
index 7a18d77..50d27a2 100755
--- a/integration-tests/script/docker_build_containers.sh
+++ b/integration-tests/script/docker_build_containers.sh
@@ -28,15 +28,15 @@
   case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
   8)
     echo "Build druid-cluster with Java 8"
-    docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=8-slim-buster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   11)
     echo "Build druid-cluster with Java 11"
-    docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=11-slim-buster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   15)
     echo "Build druid-cluster with Java 15"
-    docker build -t druid/cluster --build-arg JDK_VERSION=15-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg USE_MARIA --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+    docker build -t druid/cluster --build-arg JDK_VERSION=15-slim-buster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg USE_MARIA --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
     ;;
   *)
     echo "Invalid JVM Runtime given. Stopping"
diff --git a/licenses.yaml b/licenses.yaml
index acc6470..f7d5025 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -5735,3 +5735,78 @@
 version: 4.0.3
 license_file_path: licenses/bin/warning.MIT
 # Web console modules end
+
+---
+
+name: Apache Spark
+license_category: binary
+module: spark
+license_name: Apache License version 2.0
+version: 2.4.7
+libraries:
+  - org.apache.spark: spark-core_2.12
+  - org.apache.spark: spark-sql_2.12
+notice: |
+  Apache Spark
+  Copyright 2014 and onwards The Apache Software Foundation.
+
+  This product includes software developed at
+  The Apache Software Foundation (http://www.apache.org/).
+
+
+  Export Control Notice
+  ---------------------
+
+  This distribution includes cryptographic software. The country in which you currently reside may have
+  restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
+  BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
+  the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
+  <http://www.wassenaar.org/> for more information.
+
+  The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
+  software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
+  using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
+  Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
+  Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
+  both object code and source code.
+
+  The following provides more details on the included cryptographic software:
+
+  This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
+  support authentication, and encryption and decryption of data sent across the network between
+  services.
+
+---
+
+name: Scala Library
+license_category: binary
+module: spark
+license_name: Apache License version 2.0
+version: 2.12.12
+libraries:
+  - org.scala-lang: scala-library
+  - org.scala-lang: scala-reflect
+  - org.scala-lang: scalap
+
+---
+
+# Not sure why check-license finds these as well (they're not in mvn dependency:tree for the spark module)
+name: Paranamer
+license_category: binary
+module: spark
+license_name: BSD-3-Clause License
+version: 2.8
+copyright: Paul Hammant & ThoughtWorks Inc
+license_file_path: licenses/bin/paranamer.BSD3
+libraries:
+  - com.thoughtworks.paranamer: paranamer
+
+---
+
+name: Jackson Paranamer
+license_category: binary
+module: spark
+license_name: Apache License version 2.0
+version: 2.10.5
+libraries:
+  - com.fasterxml.jackson.module: jackson-module-paranamer
diff --git a/spark/pom.xml b/spark/pom.xml
index 6dcae79..a5094e6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -34,7 +34,7 @@
     </parent>

 

     <properties>

-        <scala.version>${scala.major.version}.12</scala.version>

+        <scala.version>${scala.major.version}.15</scala.version>

         <scala.major.version>2.12</scala.major.version>

         <spark.version>2.4.8</spark.version>

         <!-- These two properties allow -Dcheckstyle.skip to suppress scalastyle checks as well -->

@@ -114,6 +114,17 @@
         </dependency>

         <dependency>

             <groupId>org.apache.druid</groupId>

+            <artifactId>druid-server</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid</groupId>

             <artifactId>druid-processing</artifactId>

             <version>${project.parent.version}</version>

             <exclusions>

@@ -126,10 +137,6 @@
                     <artifactId>*</artifactId>

                 </exclusion>

                 <exclusion>

-                    <groupId>org.apache.maven</groupId>

-                    <artifactId>maven-artifact</artifactId>

-                </exclusion>

-                <exclusion>

                     <groupId>org.mozilla</groupId>

                     <artifactId>rhino</artifactId>

                 </exclusion>

@@ -139,9 +146,188 @@
                 </exclusion>

             </exclusions>

         </dependency>

+        <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-indexing-service</artifactId>

+            <version>${project.parent.version}</version>

+        </dependency>

+

+        <!-- Extensions included since we won't be running in a Druid cluster and can't use injection -->

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-datasketches</artifactId>

+            <version>${project.parent.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.datasketches</groupId>

+            <artifactId>datasketches-java</artifactId>

+            <version>${datasketches.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-histogram</artifactId>

+            <version>${project.parent.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-stats</artifactId>

+            <version>${project.parent.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>mysql-metadata-storage</artifactId>

+            <version>${project.parent.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>postgresql-metadata-storage</artifactId>

+            <version>${project.parent.version}</version>

+        </dependency>

+

+        <!--

+          Excluding transitive dependencies from deep storage extensions to keep dependency size manangeable. Users

+          should provide the appropriate jars for their deep storage on their Spark clusters or depend on them directly

+          in their code.

+         -->

+        <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-aws-common</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-gcp-common</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-azure-extensions</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-google-extensions</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-hdfs-storage</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid.extensions</groupId>

+            <artifactId>druid-s3-extensions</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>*</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+

+        <!-- Deep storage direct APIs -->

+        <dependency>

+            <groupId>com.microsoft.azure</groupId>

+            <artifactId>azure-storage</artifactId>

+            <version>8.6.0</version>

+            <scope>provided</scope>

+            <exclusions>

+                <exclusion>

+                    <groupId>com.fasterxml.jackson.core</groupId>

+                    <artifactId>jackson-core</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>com.google.guava</groupId>

+                    <artifactId>guava</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.apache.commons</groupId>

+                    <artifactId>commons-lang3</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.slf4j</groupId>

+                    <artifactId>slf4j-api</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>com.google.api-client</groupId>

+            <artifactId>google-api-client</artifactId>

+            <version>${com.google.apis.client.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.google.http-client</groupId>

+            <artifactId>google-http-client-jackson2</artifactId>

+            <version>${com.google.apis.client.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.google.apis</groupId>

+            <artifactId>google-api-services-storage</artifactId>

+            <version>${com.google.apis.storage.version}</version>

+            <scope>provided</scope>

+            <exclusions>

+                <exclusion>

+                    <groupId>com.google.api-client</groupId>

+                    <artifactId>google-api-client</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>com.amazonaws</groupId>

+            <artifactId>aws-java-sdk-core</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.amazonaws</groupId>

+            <artifactId>aws-java-sdk-s3</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.amazonaws</groupId>

+            <artifactId>aws-java-sdk-ec2</artifactId>

+            <scope>provided</scope>

+        </dependency>

 

 

         <dependency>

+            <groupId>com.fasterxml.jackson.module</groupId>

+            <artifactId>jackson-module-scala_2.12</artifactId>

+            <version>2.10.5</version>

+        </dependency>

+        <dependency>

             <groupId>commons-io</groupId>

             <artifactId>commons-io</artifactId>

             <scope>provided</scope>

@@ -212,6 +398,10 @@
             <scope>provided</scope>

         </dependency>

         <dependency>

+            <groupId>org.jdbi</groupId>

+            <artifactId>jdbi</artifactId>

+        </dependency>

+        <dependency>

             <groupId>org.slf4j</groupId>

             <artifactId>slf4j-api</artifactId>

             <scope>provided</scope>

@@ -302,6 +492,18 @@
             <scope>test</scope>

         </dependency>

         <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-processing</artifactId>

+            <version>${project.parent.version}</version>

+            <type>test-jar</type>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.derby</groupId>

+            <artifactId>derby</artifactId>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

             <groupId>org.scalatest</groupId>

             <artifactId>scalatest_${scala.major.version}</artifactId>

             <version>3.1.1</version>

@@ -319,11 +521,6 @@
             <scope>test</scope>

         </dependency>

         <dependency>

-            <groupId>org.easymock</groupId>

-            <artifactId>easymock</artifactId>

-            <scope>test</scope>

-        </dependency>

-        <dependency>

             <groupId>org.hamcrest</groupId>

             <artifactId>hamcrest-core</artifactId>

             <scope>test</scope>

diff --git a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..76d7577
--- /dev/null
+++ b/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more

+# contributor license agreements.  See the NOTICE file distributed with

+# this work for additional information regarding copyright ownership.

+# The ASF licenses this file to You under the Apache License, Version 2.0

+# (the "License"); you may not use this file except in compliance with

+# the License.  You may obtain a copy of the License at

+#

+#     http://www.apache.org/licenses/LICENSE-2.0

+#

+# Unless required by applicable law or agreed to in writing, software

+# distributed under the License is distributed on an "AS IS" BASIS,

+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# See the License for the specific language governing permissions and

+# limitations under the License.

+

+org.apache.druid.spark.v2.DruidDataSourceV2
\ No newline at end of file
diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
index 577f87e..cd1ce63 100644
--- a/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
@@ -78,6 +78,9 @@
     */

   def getSchema(dataSource: String, intervals: Option[List[Interval]]): Map[String, (String, Boolean)] = {

     val queryInterval = intervals.getOrElse(DefaultSegmentMetadataInterval)

+    logInfo(

+      s"Creating schema for $dataSource over intervals ${queryInterval.map(_.toString).mkString("[", ", ", "]")}.")

+

     val body = Druids.newSegmentMetadataQueryBuilder()

       .dataSource(dataSource)

       .intervals(queryInterval.asJava)

@@ -87,6 +90,7 @@
         "timeout" -> Int.box(timeoutWaitMilliseconds)

       ).asJava)

       .build()

+

     val response = sendRequestWithRetry(

       druidBaseQueryURL(hostAndPort), numRetries, Option(MAPPER.writeValueAsBytes(body))

     )

@@ -102,7 +106,8 @@
     if (segments.size() > 1) {

       throw new ISE("Merged segment metadata response had more than one row!")

     }

-    log.debug(segments.asScala.map(_.toString).mkString("SegmentAnalysis: [", ", ", "]"))

+    logDebug(segments.asScala.map(_.toString).mkString("SegmentAnalysis: [", ", ", "]"))

+

     /*

      * If a dimension has multiple types within the spanned interval, the resulting column

      * analysis will have the type "STRING" and be an error message. We abuse that here to infer

@@ -111,7 +116,7 @@
     val columns = segments.asScala.head.getColumns.asScala.toMap

     columns.foreach{ case(key, column) =>

       if (column.isError) {

-        log.warn(s"Multiple column types found for dimension $key in interval" +

+        logWarn(s"Multiple column types found for dimension $key in interval" +

           s" ${queryInterval.mkString("[", ", ", "]")}! Falling back to STRING type")

       }

     }

@@ -140,7 +145,7 @@
     } catch {

       case e: Exception =>

         if (retryCount > 0) {

-          logInfo(s"Got exception: ${e.getMessage}, retrying ...")

+          logInfo(s"Encountered an exception: ${e.getMessage}; retrying request...")

           Thread.sleep(retryWaitSeconds * 1000)

           sendRequestWithRetry(url, retryCount - 1, content)

         } else {

@@ -159,14 +164,14 @@
       ).get

       if (response.getStatus == HttpResponseStatus.TEMPORARY_REDIRECT) {

         val newUrl = response.getResponse.headers().get("Location")

-        logInfo(s"Got a redirect, new location: $newUrl")

+        logInfo(s"Redirected, new location: $newUrl")

         response = httpClient.go(

           buildRequest(newUrl, content), new StringFullResponseHandler(StringUtils.UTF8_CHARSET)

         ).get

       }

       if (!(response.getStatus == HttpResponseStatus.OK)) {

         throw new ISE(

-          s"Error getting response for %s, status[%s] content[%s]",

+          s"Error getting response for %s, status [%s] content [%s]",

           url,

           response.getStatus,

           response.getContent

diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala b/spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala
new file mode 100644
index 0000000..f4df32e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala
@@ -0,0 +1,187 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.clients

+

+import com.fasterxml.jackson.core.`type`.TypeReference

+import com.google.common.base.Suppliers

+import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler

+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils, StringUtils}

+import org.apache.druid.metadata.{DynamicConfigProvider, MetadataStorageConnectorConfig,

+  MetadataStorageTablesConfig, SQLMetadataConnector}

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.registries.SQLConnectorRegistry

+import org.apache.druid.timeline.{DataSegment, Partitions, VersionedIntervalTimeline}

+import org.skife.jdbi.v2.{DBI, Handle}

+

+import java.util.Properties

+import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter,

+  asScalaSetConverter, mapAsJavaMapConverter}

+

+class DruidMetadataClient(

+                           metadataDbType: String,

+                           host: String,

+                           port: Int,

+                           connectUri: String,

+                           user: String,

+                           passwordProviderSer: String,

+                           dbcpMap: Properties,

+                           base: String = "druid"

+                         ) extends Logging {

+  private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)

+  private lazy val dbcpProperties = new Properties(dbcpMap)

+  private lazy val password = if (passwordProviderSer == "") {

+    // Jackson doesn't like deserializing empty strings

+    passwordProviderSer

+  } else {

+    MAPPER.readValue[DynamicConfigProvider[String]](

+      passwordProviderSer, new TypeReference[DynamicConfigProvider[String]] {}

+    ).getConfig.getOrDefault("password", "")

+  }

+

+  private lazy val connectorConfig: MetadataStorageConnectorConfig =

+    new MetadataStorageConnectorConfig

+    {

+      override def isCreateTables: Boolean = false

+      override def getHost: String = host

+      override def getPort: Int = port

+      override def getConnectURI: String = connectUri

+      override def getUser: String = user

+      override def getPassword: String = password

+      override def getDbcpProperties: Properties = dbcpProperties

+    }

+  private lazy val connectorConfigSupplier = Suppliers.ofInstance(connectorConfig)

+  private lazy val metadataTableConfigSupplier = Suppliers.ofInstance(druidMetadataTableConfig)

+  private lazy val connector = buildSQLConnector()

+

+  /**

+    * Get the non-overshadowed used segments for DATASOURCE between INTERVALSTART and INTERVALEND. If either interval

+    * endpoint is None, JodaUtils.MIN_INSTANCE/MAX_INSTANCE is used instead. By default, only segments for complete

+    * partitions are returned. This behavior can be overriden by setting ALLOWINCOMPLETEPARTITIONS, in which case all

+    * non-overshadowed segments in the interval will be returned, regardless of completesness.

+    *

+    * @param datasource The Druid data source to get segment payloads for.

+    * @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.

+    * @param intervalEnd The end of the interval to fetch segment payloads for. If None, MAX_INSTANT is used.

+    * @param allowIncompletePartitions Whether or not to include segments from incomplete partitions

+    * @return A Sequence of DataSegments representing all non-overshadowed used segments for the given data source and

+    *         interval.

+    */

+  def getSegmentPayloads(

+                          datasource: String,

+                          intervalStart: Option[Long],

+                          intervalEnd: Option[Long],

+                          allowIncompletePartitions: Boolean = false

+                        ): Seq[DataSegment] = {

+    val dbi: DBI = connector.getDBI

+    val interval = Intervals.utc(

+      intervalStart.getOrElse(JodaUtils.MIN_INSTANT), intervalEnd.getOrElse(JodaUtils.MAX_INSTANT)

+    )

+    logInfo(s"Fetching segment payloads for interval [${interval.toString}] on datasource [$datasource].")

+    val startClause = if (intervalStart.isDefined) " AND start >= :start" else ""

+    val endClause = if (intervalEnd.isDefined) " AND \"end\" <= :end" else ""

+    val allSegments = dbi.withHandle((handle: Handle) => {

+      val statement =

+        s"""

+           |SELECT payload FROM ${druidMetadataTableConfig.getSegmentsTable}

+           |WHERE datasource = :datasource AND used = true$startClause$endClause

+        """.stripMargin

+

+      val bindMap = Seq(Some("datasource" -> datasource),

+        intervalStart.map(bound => "start" -> DateTimes.utc(bound).toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")),

+        intervalEnd.map(bound => "end" -> DateTimes.utc(bound).toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))

+      ).flatten.toMap

+

+      val query = handle.createQuery(statement)

+      val result = query

+        .bindFromMap(bindMap.asJava)

+        .mapTo(classOf[Array[Byte]]).list().asScala

+      result.map(blob =>

+        MAPPER.readValue[DataSegment](

+          StringUtils.fromUtf8(blob), new TypeReference[DataSegment] {}

+        )

+      )

+    })

+    val activeSegments = VersionedIntervalTimeline.forSegments(allSegments.asJava).findNonOvershadowedObjectsInInterval(

+      interval,

+      if (allowIncompletePartitions) Partitions.INCOMPLETE_OK else Partitions.ONLY_COMPLETE

+    ).asScala.toSeq

+    logInfo(s"Fetched payloads for ${activeSegments.size} segments for interval [${interval.toString}] on " +

+      s"datasource [$datasource].")

+    activeSegments

+  }

+

+  def publishSegments(segments: java.util.List[DataSegment]): Unit = {

+    val metadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector)

+    metadataStorageUpdaterJobHandler.publishSegments(druidMetadataTableConfig.getSegmentsTable,

+      segments, MAPPER)

+    logInfo(s"Published ${segments.size()} segments.")

+  }

+

+  def checkIfDataSourceExists(dataSource: String): Boolean = {

+    val dbi: DBI = connector.getDBI

+    dbi.withHandle((handle: Handle) => {

+      val statement =

+        s"""

+           |SELECT DISTINCT dataSource FROM ${druidMetadataTableConfig.getSegmentsTable}

+           |WHERE used = true AND dataSource = :dataSource

+         """.stripMargin

+      !handle.createQuery(statement).bind("dataSource", dataSource).list().isEmpty

+    })

+  }

+

+  /**

+    * This won't run in a Druid cluster, so users will need to respecify metadata connection info.

+    * This also means users will need to specifically include the extension jars on their clusters.

+    *

+    * @return

+    */

+  private def buildSQLConnector(): SQLMetadataConnector = {

+    SQLConnectorRegistry.create(metadataDbType, connectorConfigSupplier, metadataTableConfigSupplier)

+  }

+}

+

+object DruidMetadataClient {

+  def apply(conf: Configuration): DruidMetadataClient = {

+    val metadataConf = conf.dive(DruidConfigurationKeys.metadataPrefix)

+

+    require(metadataConf.isPresent(DruidConfigurationKeys.metadataDbTypeKey),

+      s"Must set ${DruidConfigurationKeys.metadataPrefix}." +

+        s"${DruidConfigurationKeys.metadataDbTypeKey} or provide segments directly!")

+    val dbcpProperties = if (metadataConf.isPresent(DruidConfigurationKeys.metadataDbcpPropertiesKey)) {

+      MAPPER.readValue[Properties](metadataConf.getString(DruidConfigurationKeys.metadataDbcpPropertiesKey),

+        new TypeReference[Properties] {})

+    } else {

+      new Properties()

+    }

+

+    new DruidMetadataClient(

+      metadataConf.getAs[String](DruidConfigurationKeys.metadataDbTypeKey),

+      metadataConf.get(DruidConfigurationKeys.metadataHostDefaultKey),

+      metadataConf.getInt(DruidConfigurationKeys.metadataPortDefaultKey),

+      metadataConf.getString(DruidConfigurationKeys.metadataConnectUriKey),

+      metadataConf.getString(DruidConfigurationKeys.metadataUserKey),

+      metadataConf.getString(DruidConfigurationKeys.metadataPasswordKey),

+      dbcpProperties,

+      metadataConf.get(DruidConfigurationKeys.metadataBaseNameDefaultKey)

+    )

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
index 71f31de..636ab72 100644
--- a/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
@@ -205,5 +205,7 @@
     StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR))

   }

 

+  val EMPTY_CONF: Configuration = Configuration(Map[String, String]())

+

   private val SEPARATOR = "."

 }

diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
index cfa1d74..60236b0 100644
--- a/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
@@ -19,7 +19,26 @@
 

 package org.apache.druid.spark.configuration

 

+import org.apache.spark.sql.sources.v2.DataSourceOptions

+

 object DruidConfigurationKeys {

+  // Shadowing DataSourceOptions.TABLE_KEY here so other classes won't need unnecessary imports

+  val tableKey: String = DataSourceOptions.TABLE_KEY

+

+  // Metadata Client Configs

+  val metadataPrefix: String = "metadata"

+  val metadataDbTypeKey: String = "dbType"

+  val metadataHostKey: String = "host"

+  val metadataPortKey: String = "port"

+  val metadataConnectUriKey: String = "connectUri"

+  val metadataUserKey: String = "user"

+  val metadataPasswordKey: String = "password"

+  val metadataDbcpPropertiesKey: String = "dbcpProperties"

+  val metadataBaseNameKey: String = "baseName"

+  private[spark] val metadataHostDefaultKey: (String, String) = (metadataHostKey, "localhost")

+  private[spark] val metadataPortDefaultKey: (String, Int) = (metadataPortKey, 1527)

+  private[spark] val metadataBaseNameDefaultKey: (String, String) = (metadataBaseNameKey, "druid")

+

   // Druid Client Configs

   val brokerPrefix: String = "broker"

   val brokerHostKey: String = "host"

@@ -32,4 +51,79 @@
   private[spark] val numRetriesDefaultKey: (String, Int) = (numRetriesKey, 5)

   private[spark] val retryWaitSecondsDefaultKey: (String, Int) = (retryWaitSecondsKey, 5)

   private[spark] val timeoutMillisecondsDefaultKey: (String, Int) = (timeoutMillisecondsKey, 300000)

+

+  // Common configs

+  val useCompactSketchesKey: String = "useCompactSketches"

+  val useDefaultValueForNull: String = "useDefaultValueForNull"

+  private[spark] val useCompactSketchesDefaultKey: (String, Boolean) = (useCompactSketchesKey, false)

+  private[spark] val useDefaultValueForNullDefaultKey: (String, Boolean) = (useDefaultValueForNull, true)

+

+  // Reader Configs

+  val readerPrefix: String = "reader"

+  val segmentsKey: String = "segments"

+  val vectorizeKey: String = "vectorize" // Experimental key!

+  val batchSizeKey: String = "batchSize"

+  val useSparkConfForDeepStorageKey: String = "useClusterConfForDeepStorage"

+  val allowIncompletePartitionsKey: String = "allowIncompletePartitions"

+  val timestampColumnKey: String = "timestampColumn"

+  val timestampFormatKey: String = "timestampFormat"

+  private[spark] val vectorizeDefaultKey: (String, Boolean) = (vectorizeKey, false)

+  private[spark] val batchSizeDefaultKey: (String, Int) = (batchSizeKey, 512)

+  private[spark] val useSparkConfForDeepStorageDefaultKey: (String, Boolean) = (useSparkConfForDeepStorageKey, false)

+  private[spark] val allowIncompletePartitionsDefaultKey: (String, Boolean) = (allowIncompletePartitionsKey, false)

+  private[spark] val timestampColumnDefaultReaderKey: (String, String) = (timestampColumnKey, "__time")

+  private[spark] val timestampFormatDefaultReaderKey: (String, String) = (timestampFormatKey, "millis")

+

+

+  val deepStorageTypeKey: String = "deepStorageType"

+  private[spark] val deepStorageTypeDefaultKey: (String, String) = (deepStorageTypeKey, "local")

+

+  // Common Deep Storage Configs

+  val storageDirectoryKey: String = "storageDirectory"

+  val bucketKey: String = "bucket"

+  val maxListingLengthKey: String = "maxListingLength"

+  val prefixKey: String = "prefix"

+  val protocolKey: String = "protocol"

+

+  // Local Deep Storage Configs

+  val localDeepStorageTypeKey: String = "local"

+  val localStorageDirectoryKey: String = "storageDirectory"

+

+  // HDFS Deep Storage Configs

+  val hdfsDeepStorageTypeKey: String = "hdfs"

+  val hdfsHadoopConfKey: String = "hadoopConf" // Base64-encoded serialized Configuration

+

+  // S3 Deep Storage Configs

+  val s3DeepStorageTypeKey: String = "s3"

+  val s3BaseKeyKey: String = "baseKey"

+  val s3DisableAclKey: String = "disableAcl"

+  val s3UseS3ASchemaKey: String = "useS3aSchema"

+  val s3AccessKeyKey: String = "accessKey"

+  val s3SecretKeyKey: String = "secretKey"

+  val s3FileSessionCredentialsKey: String = "fileSessionCredentials"

+  val s3ProxyPrefix: String = "proxy"

+  val s3ProxyHostKey: String = "host"

+  val s3ProxyPortKey: String = "port"

+  val s3ProxyUsernameKey: String = "username"

+  val s3ProxyPasswordKey: String = "password"

+  val s3EndpointPrefix: String = "endpoint"

+  val s3EndpointUrlKey: String = "url"

+  val s3EndpointSigningRegionKey: String = "signingRegion"

+  val s3DisableChunkedEncodingKey: String = "disableChunkedEncoding"

+  val s3EnablePathStyleAccessKey: String = "enablePathStyleAccess"

+  val s3ForceGlobalBucketAccessEnabledKey: String = "forceGlobalBucketAccessEnabled"

+  val s3ServerSideEncryptionPrefix: String = "sse"

+  val s3ServerSideEncryptionTypeKey: String = "type"

+  val s3ServerSideEncryptionKmsKeyIdKey: String = "keyId"

+  val s3ServerSideEncryptionCustomKeyKey: String = "base64EncodedKey"

+

+  // GCS Deep Storage Configs

+  val googleDeepStorageTypeKey: String = "google"

+

+  // Azure Deep Storage Configs

+  val azureDeepStorageTypeKey: String = "azure"

+  val azureAccountKey: String = "account"

+  val azureKeyKey: String = "key"

+  val azureMaxTriesKey: String = "maxTries"

+  val azureContainerKey: String = "container"

 }

diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/SerializableHadoopConfiguration.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/SerializableHadoopConfiguration.scala
new file mode 100644
index 0000000..50bca9a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/SerializableHadoopConfiguration.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+import org.apache.druid.spark.mixins.Logging
+import org.apache.hadoop.conf.{Configuration => HConf}
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import scala.util.control.NonFatal
+
+/**
+  * Adapted from org.apache.spark.util.SerializableConfiguration
+  * A serializable version of a hadoop Configuration. Use `value` to access the Configuration.
+  *
+  * @param value Hadoop configuration
+  */
+class SerializableHadoopConfiguration(@transient var value: HConf) extends Serializable
+  with Logging {
+  private def writeObject(out: ObjectOutputStream): Unit = {
+    try {
+      out.defaultWriteObject()
+      value.write(out)
+    } catch {
+      case e: IOException =>
+        logError("Exception encountered", e)
+        throw e
+      case NonFatal(e) =>
+        logError("Exception encountered", e)
+        throw new IOException(e)
+    }
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = {
+    try {
+      value = new HConf(false)
+      value.readFields(in)
+    } catch  {
+      case e: IOException =>
+        logError("Exception encountered", e)
+        throw e
+      case NonFatal(e) =>
+        logError("Exception encountered", e)
+        throw new IOException(e)
+    }
+  }
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/model/AzureDeepStorageConfig.scala b/spark/src/main/scala/org/apache/druid/spark/model/AzureDeepStorageConfig.scala
new file mode 100644
index 0000000..f98b63c
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/model/AzureDeepStorageConfig.scala
@@ -0,0 +1,94 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.model

+

+import org.apache.druid.metadata.DynamicConfigProvider

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+

+import scala.collection.mutable

+

+class AzureDeepStorageConfig extends DeepStorageConfig(DruidConfigurationKeys.azureDeepStorageTypeKey) {

+  private val optionsMap: mutable.Map[String, String] = mutable.Map[String, String](

+    DruidConfigurationKeys.deepStorageTypeKey -> deepStorageType

+  )

+

+  def account(account: String): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.azureAccountKey)

+    optionsMap.put(key, account)

+    this

+  }

+

+  def key(keyProvider: DynamicConfigProvider[String], confKey: String): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.azureKeyKey)

+    optionsMap.put(key, keyProvider.getConfig.getOrDefault(confKey, ""))

+    this

+  }

+

+  def key(azureKey: String): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.azureKeyKey)

+    optionsMap.put(key, azureKey)

+    this

+  }

+

+  def maxTries(maxTries: Int): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.azureMaxTriesKey)

+    addToOptions(key, maxTries)

+    this

+  }

+

+  def protocol(protocol: String): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.protocolKey)

+    optionsMap.put(key, protocol)

+    this

+  }

+

+  def container(container: String): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.azureContainerKey)

+    optionsMap.put(key, container)

+    this

+  }

+

+  def maxListingLength(maxListingLength: Int): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.maxListingLengthKey)

+    optionsMap.put(key, maxListingLength.toString)

+    this

+  }

+

+  def prefix(prefix: String): AzureDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.azureDeepStorageTypeKey,

+      DruidConfigurationKeys.prefixKey)

+    optionsMap.put(key, prefix)

+    this

+  }

+

+  override def toOptions: Map[String, String] = optionsMap.toMap

+

+  private def addToOptions(key: String, value: Any): AzureDeepStorageConfig = {

+    optionsMap.put(key, value.toString)

+    this

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/model/DeepStorageConfig.scala b/spark/src/main/scala/org/apache/druid/spark/model/DeepStorageConfig.scala
new file mode 100644
index 0000000..7800756
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/model/DeepStorageConfig.scala
@@ -0,0 +1,24 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.model

+

+abstract class DeepStorageConfig(protected val deepStorageType: String) {

+  def toOptions: Map[String, String]

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/model/GoogleDeepStorageConfig.scala b/spark/src/main/scala/org/apache/druid/spark/model/GoogleDeepStorageConfig.scala
new file mode 100644
index 0000000..34067ca
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/model/GoogleDeepStorageConfig.scala
@@ -0,0 +1,53 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.model

+

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+

+import scala.collection.mutable

+

+class GoogleDeepStorageConfig extends DeepStorageConfig(DruidConfigurationKeys.googleDeepStorageTypeKey) {

+  private val optionsMap: mutable.Map[String, String] = mutable.Map[String, String](

+    DruidConfigurationKeys.deepStorageTypeKey -> deepStorageType

+  )

+

+  def bucket(bucket: String): GoogleDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.googleDeepStorageTypeKey,

+      DruidConfigurationKeys.bucketKey)

+    optionsMap.put(key, bucket)

+    this

+  }

+

+  def prefix(prefix: String): GoogleDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.googleDeepStorageTypeKey,

+      DruidConfigurationKeys.prefixKey)

+    optionsMap.put(key, prefix)

+    this

+  }

+

+  def maxListingLength(maxListingLength: Int): GoogleDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.googleDeepStorageTypeKey,

+      DruidConfigurationKeys.maxListingLengthKey)

+    optionsMap.put(key, maxListingLength.toString)

+    this

+  }

+

+  override def toOptions: Map[String, String] = optionsMap.toMap

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/model/HdfsDeepStorageConfig.scala b/spark/src/main/scala/org/apache/druid/spark/model/HdfsDeepStorageConfig.scala
new file mode 100644
index 0000000..4421f10
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/model/HdfsDeepStorageConfig.scala
@@ -0,0 +1,46 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.model

+

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+

+import scala.collection.mutable

+

+class HdfsDeepStorageConfig extends DeepStorageConfig(DruidConfigurationKeys.hdfsDeepStorageTypeKey) {

+  private val optionsMap: mutable.Map[String, String] = mutable.Map[String, String](

+    DruidConfigurationKeys.deepStorageTypeKey -> deepStorageType

+  )

+

+  def storageDirectory(storageDirectory: String): HdfsDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.hdfsDeepStorageTypeKey,

+      DruidConfigurationKeys.storageDirectoryKey)

+    optionsMap.put(key, storageDirectory)

+    this

+  }

+

+  def hadoopConf(conf: String): HdfsDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.hdfsDeepStorageTypeKey,

+      DruidConfigurationKeys.hdfsHadoopConfKey)

+    optionsMap.put(key, conf)

+    this

+  }

+

+  override def toOptions: Map[String, String] = optionsMap.toMap

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/model/LocalDeepStorageConfig.scala b/spark/src/main/scala/org/apache/druid/spark/model/LocalDeepStorageConfig.scala
new file mode 100644
index 0000000..b8d3ea4
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/model/LocalDeepStorageConfig.scala
@@ -0,0 +1,39 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.model

+

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+

+import scala.collection.mutable

+

+class LocalDeepStorageConfig extends DeepStorageConfig(DruidConfigurationKeys.localDeepStorageTypeKey) {

+  private val optionsMap: mutable.Map[String, String] = mutable.Map[String, String](

+    DruidConfigurationKeys.deepStorageTypeKey -> deepStorageType

+  )

+

+  def storageDirectory(storageDirectory: String): LocalDeepStorageConfig = {

+    val key = Configuration.toKey(DruidConfigurationKeys.localDeepStorageTypeKey,

+      DruidConfigurationKeys.storageDirectoryKey)

+    optionsMap.put(key, storageDirectory)

+    this

+  }

+

+  override def toOptions: Map[String, String] = optionsMap.toMap

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/model/S3DeepStorageConfig.scala b/spark/src/main/scala/org/apache/druid/spark/model/S3DeepStorageConfig.scala
new file mode 100644
index 0000000..3a70c90
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/model/S3DeepStorageConfig.scala
@@ -0,0 +1,169 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.model

+

+import org.apache.druid.metadata.DynamicConfigProvider

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+

+import scala.collection.mutable

+

+class S3DeepStorageConfig extends DeepStorageConfig(DruidConfigurationKeys.s3DeepStorageTypeKey) {

+  private val optionsMap: mutable.Map[String, String] = mutable.Map[String, String](

+    DruidConfigurationKeys.deepStorageTypeKey -> deepStorageType

+  )

+

+  def bucket(bucket: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.bucketKey), bucket)

+  }

+

+  def baseKey(baseKey: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3BaseKeyKey), baseKey)

+  }

+

+  def maxListingLength(maxListingLength: Int): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.bucketKey), maxListingLength)

+  }

+

+  def disableAcl(disableAcl: Boolean): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3DisableAclKey), disableAcl)

+  }

+

+  def useS3aSchema(useS3aSchema: Boolean): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3UseS3ASchemaKey), useS3aSchema)

+  }

+

+  /**

+    * Extracts the S3 access key and secret key from KEYSPROVIDER using ACCESSKEYCONFKEY and SECRETKEYCONFKEY to extract

+    * the access key and secret key, respectively.

+    *

+    * @param keysProvider The DynamicConfigProvider providing the S3 access and secret keys.

+    * @param accessKeyConfKey The key in the config provided by KEYSPROVIDER whose value is the access key to use.

+    * @param secretKeyConfKey The key in the config provided by KEYSPROVIDER whose value is the secret key to use.

+    */

+  def keys(

+            keysProvider: DynamicConfigProvider[String],

+            accessKeyConfKey: String,

+            secretKeyConfKey: String

+          ): S3DeepStorageConfig = {

+    val config = keysProvider.getConfig

+    val accessKey = config.getOrDefault(accessKeyConfKey, "")

+    val secretKey = config.getOrDefault(secretKeyConfKey, "")

+    addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), accessKey)

+    addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), secretKey)

+  }

+

+  def accessKey(accessKey: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), accessKey)

+  }

+

+  def secretKey(secretKey: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3AccessKeyKey), secretKey)

+  }

+

+  def fileSessionCredentials(fileSessionCredentials: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3FileSessionCredentialsKey), fileSessionCredentials)

+  }

+

+  def proxyHost(host: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyHostKey), host)

+  }

+

+  def proxyPort(port: Int): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyPortKey), port.toString)

+  }

+

+  def proxyUsername(username: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyUsernameKey), username)

+  }

+

+  /**

+    * Extracts the proxy password to use in communicating with S3 from PASSWORDPROVIDER using CONFKEY.

+    *

+    * @param passwordProvider The DynamicConfigProvider providing the proxy password.

+    * @param confKey The key in the config provided by PASSWORDPROVIDER whose value is the proxy password to use.

+    */

+  def proxyPassword(passwordProvider: DynamicConfigProvider[String], confKey: String): S3DeepStorageConfig = {

+    val config = passwordProvider.getConfig

+    addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyPasswordKey),

+      config.getOrDefault(confKey, ""))

+  }

+

+  def proxyPassword(password: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3ProxyPrefix, DruidConfigurationKeys.s3ProxyPasswordKey), password)

+  }

+

+  def endpointUrl(endpointUrl: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3EndpointPrefix, DruidConfigurationKeys.s3EndpointUrlKey), endpointUrl)

+  }

+

+  def endpointSigningRegion(endpointSigningRegion: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3EndpointPrefix, DruidConfigurationKeys.s3EndpointSigningRegionKey),

+      endpointSigningRegion

+    )

+  }

+

+  def protocol(protocol: String): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.protocolKey), protocol)

+  }

+

+  def disableChunkedEnconding(disableChunkedEncoding: Boolean): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3DisableChunkedEncodingKey), disableChunkedEncoding)

+  }

+

+  def enablePathStyleAcess(enablePathStyleAccess: Boolean): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3EnablePathStyleAccessKey), enablePathStyleAccess)

+  }

+

+  def forceGlobalBucketAccessEnabled(forceGlobalBucketAccessEnabled: Boolean): S3DeepStorageConfig = {

+    addToOptions(prefix(DruidConfigurationKeys.s3ForceGlobalBucketAccessEnabledKey), forceGlobalBucketAccessEnabled)

+  }

+

+  def sseType(sseType: String): S3DeepStorageConfig = {

+    addToOptions(prefix(

+      DruidConfigurationKeys.s3ServerSideEncryptionPrefix, DruidConfigurationKeys.s3ServerSideEncryptionTypeKey),

+      sseType

+    )

+  }

+

+  def sseKmsKeyId(sseKmsKeyId: String): S3DeepStorageConfig = {

+    addToOptions(prefix(

+      DruidConfigurationKeys.s3ServerSideEncryptionPrefix, DruidConfigurationKeys.s3ServerSideEncryptionKmsKeyIdKey),

+      sseKmsKeyId

+    )

+  }

+

+  def sseCustomKey(sseCustomKey: String): S3DeepStorageConfig = {

+    addToOptions(prefix(

+      DruidConfigurationKeys.s3ServerSideEncryptionPrefix, DruidConfigurationKeys.s3ServerSideEncryptionCustomKeyKey),

+      sseCustomKey

+    )

+  }

+

+  override def toOptions: Map[String, String] = optionsMap.toMap

+

+  private def addToOptions(key: String, value: Any): S3DeepStorageConfig = {

+    optionsMap.put(key, value.toString)

+    this

+  }

+

+  private def prefix(keys: String*): String = {

+    Configuration.toKey(DruidConfigurationKeys.s3DeepStorageTypeKey +: keys:_*)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/package.scala b/spark/src/main/scala/org/apache/druid/spark/package.scala
index 22b440a..bef68b1 100644
--- a/spark/src/main/scala/org/apache/druid/spark/package.scala
+++ b/spark/src/main/scala/org/apache/druid/spark/package.scala
@@ -19,9 +19,211 @@
 

 package org.apache.druid

 

-import com.fasterxml.jackson.databind.ObjectMapper

+import com.fasterxml.jackson.core.`type`.TypeReference

+import com.fasterxml.jackson.databind.jsontype.NamedType

+import com.fasterxml.jackson.databind.{InjectableValues, Module, ObjectMapper}

 import org.apache.druid.jackson.DefaultObjectMapper

+import org.apache.druid.math.expr.ExprMacroTable

+import org.apache.druid.metadata.DynamicConfigProvider

+import org.apache.druid.query.expression.{LikeExprMacro, RegexpExtractExprMacro,

+  TimestampCeilExprMacro, TimestampExtractExprMacro, TimestampFloorExprMacro,

+  TimestampFormatExprMacro, TimestampParseExprMacro, TimestampShiftExprMacro, TrimExprMacro}

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+import org.apache.druid.spark.model.DeepStorageConfig

+import org.apache.druid.spark.v2.DruidDataSourceV2ShortName

+import org.apache.druid.timeline.DataSegment

+import org.apache.druid.timeline.DataSegment.PruneSpecsHolder

+import org.apache.spark.sql.{DataFrame, DataFrameReader}

+

+import _root_.java.util.Properties

+import scala.collection.JavaConverters.seqAsJavaListConverter

+import scala.language.implicitConversions

 

 package object spark {

   private[spark] val MAPPER: ObjectMapper = new DefaultObjectMapper()

+

+  private[spark] val baseInjectableValues: InjectableValues.Std =

+    new InjectableValues.Std()

+      .addValue(classOf[ExprMacroTable], new ExprMacroTable(Seq(

+        new LikeExprMacro(),

+        new RegexpExtractExprMacro(),

+        new TimestampCeilExprMacro(),

+        new TimestampExtractExprMacro(),

+        new TimestampFormatExprMacro(),

+        new TimestampParseExprMacro(),

+        new TimestampShiftExprMacro(),

+        new TimestampFloorExprMacro(),

+        new TrimExprMacro.BothTrimExprMacro(),

+        new TrimExprMacro.LeftTrimExprMacro(),

+        new TrimExprMacro.RightTrimExprMacro()).asJava))

+      .addValue(classOf[ObjectMapper], MAPPER)

+      .addValue(classOf[DataSegment.PruneSpecsHolder], PruneSpecsHolder.DEFAULT)

+

+  MAPPER.setInjectableValues(baseInjectableValues)

+

+  /*

+   * Utility methods for serializing and deserializing objects using the same object mapper used

+   * internally to these connectors.

+   */

+

+  def serialize(obj: AnyRef): String = {

+    MAPPER.writeValueAsString(obj)

+  }

+

+  def deserialize[T](json: String, typeReference: TypeReference[T]): T = {

+    MAPPER.readValue[T](json, typeReference)

+  }

+

+  def registerModules(modules: Module*): ObjectMapper = {

+    MAPPER.registerModules(modules: _*)

+  }

+

+  def registerSubType(subTypeClass: Class[_], name: String): Unit = {

+    MAPPER.registerSubtypes(new NamedType(subTypeClass, name))

+  }

+

+  def setInjectableValue(clazz: Class[_], value: AnyRef): Unit = {

+    MAPPER.setInjectableValues(baseInjectableValues.addValue(clazz, value))

+  }

+

+  implicit def druidDataFrameReaderToDataFrameReader(druidReader: DruidDataFrameReader): DataFrameReader =

+    druidReader.reader

+

+  // scalastyle:off number.of.methods (There are more than 30 configurable options for the Reader)

+  implicit class DruidDataFrameReader(private[spark] val reader: DataFrameReader) {

+    def druid(): DataFrame = {

+      reader.format(DruidDataSourceV2ShortName).load()

+    }

+

+    def allowIncompletePartitions(allowIncompletePartitions: Boolean): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.allowIncompletePartitionsKey), allowIncompletePartitions)

+    }

+

+    def batchSize(batchSize: Int): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.batchSizeKey), batchSize)

+    }

+

+    def brokerHost(host: String): DataFrameReader = {

+      reader.option(brokerPrefix(DruidConfigurationKeys.brokerHostKey), host)

+    }

+

+    def brokerPort(port: Int): DataFrameReader = {

+      reader.option(brokerPrefix(DruidConfigurationKeys.brokerPortKey), port)

+    }

+

+    def dataSource(dataSource: String): DataFrameReader = {

+      reader.option(DruidConfigurationKeys.tableKey, dataSource)

+    }

+

+    def deepStorage(deepStorageConfig: DeepStorageConfig): DataFrameReader = {

+      reader.options(deepStorageConfig.toOptions)

+    }

+

+    def metadataBaseName(baseName: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataBaseNameKey), baseName)

+    }

+

+    def metadataDbcpProperties(properties: Properties): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataDbcpPropertiesKey),

+        MAPPER.writeValueAsString(properties))

+    }

+

+    def metadataDbcpProperties(properties: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataDbcpPropertiesKey), properties)

+    }

+

+    def metadataDbType(dbType: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataDbTypeKey), dbType)

+    }

+

+    def metadataHost(host: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataHostKey), host)

+    }

+

+    def metadataPassword(provider: DynamicConfigProvider[String], confKey: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataPasswordKey),

+        provider.getConfig.getOrDefault(confKey, ""))

+    }

+

+    def metadataPassword(password: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataPasswordKey), password)

+    }

+

+    def metadataPort(port: Int): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataPortKey), port)

+    }

+

+    def metadataUri(uri: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataConnectUriKey), uri)

+    }

+

+    def metadataUser(user: String): DataFrameReader = {

+      reader.option(metadataPrefix(DruidConfigurationKeys.metadataUserKey), user)

+    }

+

+    def numRetries(numRetries: Int): DataFrameReader = {

+      reader.option(brokerPrefix(DruidConfigurationKeys.numRetriesKey), numRetries)

+    }

+

+    def retryWaitSeconds(retryWaitSeconds: Int): DataFrameReader = {

+      reader.option(brokerPrefix(DruidConfigurationKeys.retryWaitSecondsKey), retryWaitSeconds)

+    }

+

+    def segments(segments: Seq[DataSegment]): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.segmentsKey), MAPPER.writeValueAsString(segments))

+    }

+

+    def segments(segments: Seq[String])(implicit d: DummyImplicit): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.segmentsKey), MAPPER.writeValueAsString(segments))

+    }

+

+    def segments(segments: String): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.segmentsKey), segments)

+    }

+

+    def timeoutMilliseconds(timeoutMilliseconds: Int): DataFrameReader = {

+      reader.option(brokerPrefix(DruidConfigurationKeys.timeoutMillisecondsKey), timeoutMilliseconds)

+    }

+

+    def timestampColumn(timestampColumn: String): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.timestampColumnKey), timestampColumn)

+    }

+

+    def timestampFormat(timestampFormat: String): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.timestampFormatKey), timestampFormat)

+    }

+

+    def useCompactSketches(useCompactSketches: Boolean): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.useCompactSketchesKey), useCompactSketches)

+    }

+

+    def useDefaultValueForNull(useDefaultValueForNull: Boolean): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.useDefaultValueForNull), useDefaultValueForNull)

+    }

+

+    def useSparkConfForDeepStorage(useSparkConfForDeepStorage: Boolean): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.useSparkConfForDeepStorageKey), useSparkConfForDeepStorage)

+    }

+

+    def vectorize(vectorize: Boolean): DataFrameReader = {

+      reader.option(readerPrefix(DruidConfigurationKeys.vectorizeKey), vectorize)

+    }

+

+    private def brokerPrefix(key: String): String = {

+      prefix(DruidConfigurationKeys.brokerPrefix, key)

+    }

+

+    private def metadataPrefix(key: String): String = {

+      prefix(DruidConfigurationKeys.metadataPrefix, key)

+    }

+

+    private def readerPrefix(key: String): String = {

+      prefix(DruidConfigurationKeys.readerPrefix, key)

+    }

+

+    private def prefix( base: String, key: String): String = {

+      Configuration.toKey(base, key)

+    }

+  }

+  // scalastyle:on number.of.methods

 }

diff --git a/spark/src/main/scala/org/apache/druid/spark/registries/ComplexTypeRegistry.scala b/spark/src/main/scala/org/apache/druid/spark/registries/ComplexTypeRegistry.scala
new file mode 100644
index 0000000..66817f7
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/registries/ComplexTypeRegistry.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.registries
+
+import org.apache.datasketches.hll.HllSketch
+import org.apache.datasketches.quantiles.DoublesSketch
+import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule
+import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule
+import org.apache.druid.query.aggregation.datasketches.theta.{SketchHolder, SketchModule}
+import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule
+import org.apache.druid.query.aggregation.histogram.{ApproximateHistogram,
+  ApproximateHistogramDruidModule, FixedBucketsHistogram, FixedBucketsHistogramAggregator}
+import org.apache.druid.query.aggregation.variance.{VarianceAggregatorCollector, VarianceSerde}
+import org.apache.druid.segment.serde.ComplexMetrics
+import org.apache.druid.spark.mixins.Logging
+
+import scala.collection.mutable
+
+/**
+ * A registry for plugging in support for Druid complex types. Provides definitions for supporting complex types in
+  * extensions-core out of the box.
+ */
+object ComplexTypeRegistry extends Logging {
+  private val registeredSerdeInitFunctions: mutable.HashMap[String, () => Unit] = new mutable.HashMap()
+  private val registeredSerializeFunctions: mutable.HashMap[Class[_], Any => Array[Byte]] =
+    new mutable.HashMap()
+
+  /**
+    * Register a function REGISTERSERDEFUNC that initializes serializers and deserializers for the complex type with
+    * the name NAME. Assumes that the associated complex type is serialized as a byte array.
+    *
+    * @param name The type name of the complex type to register.
+    * @param registerSerdeFunc The function to use to register the necessary serdes for this complex type.
+    */
+  def register(
+                name: String,
+                registerSerdeFunc: () => Unit
+              ): Unit = {
+    logInfo(s"Registering serde initializers for complex type $name.")
+    registeredSerdeInitFunctions(name) = registerSerdeFunc
+  }
+
+  /**
+    * Register a function REGISTERSERDEFUNC that initializes serializers and deserializers, a class DESERIALIZEDCLASS
+    * to deserialize byte arrays into, and a function SERIALIZEFUNC for converting from instances of DESERIALIZEDCLASS
+    * to byte arrays for the complex type with the name NAME. Assumes that the associated complex type is
+    * serialized as a byte array.
+    *
+    * @param name The type name of the complex type to register.
+    * @param registerSerdeFunc The function to use to register the necessary serdes for this complex type.
+    * @param deserializedClass The class to deserialize values of the registered type to from byte arrays.
+    * @param serializeFunc The function to use when serializing instances of DESERIALIZEDCLASS to byte arrays.
+    */
+  def register(
+                name: String,
+                registerSerdeFunc: () => Unit,
+                deserializedClass: Class[_],
+                serializeFunc: Any => Array[Byte]): Unit = {
+    logInfo(s"Registering serde initializers and serialization functions for complex type $name.")
+    registeredSerdeInitFunctions(name) = registerSerdeFunc
+    registeredSerializeFunctions(deserializedClass) = serializeFunc
+  }
+
+  /**
+    * Shortcut for registering known complex type serdes (e.g. those in extensions-core) by name.
+    *
+    * @param name The type name of the complex type to register.
+    * @param shouldCompact Whether or not to store compacted versions of this complex type. Ignored for complex types
+    *                      that don't have compacted forms.
+    */
+  def registerByName(name: String, shouldCompact: Boolean = false): Unit = {
+    if (!registeredSerdeInitFunctions.contains(name) && knownTypes.contains(name)) {
+      knownTypes(name)(shouldCompact)
+    }
+  }
+
+  def getRegisteredMetricNames: Set[String] = {
+    registeredSerdeInitFunctions.keySet.toSet
+  }
+
+  def getRegisteredSerializedClasses: Set[Class[_]] = {
+    registeredSerializeFunctions.keySet.toSet
+  }
+
+  def registerSerdeInitFunctions(complexTypeName: String): Unit = {
+    if (registeredSerdeInitFunctions.contains(complexTypeName)) {
+      registeredSerdeInitFunctions(complexTypeName).apply()
+    }
+  }
+
+  def deserialize(col: Any): Array[Byte] = {
+    if (registeredSerializeFunctions.keySet.contains(col.getClass)) {
+      registeredSerializeFunctions(col.getClass)(col)
+    } else {
+      throw new IllegalArgumentException(
+        s"Unsure how to parse ${col.getClass.toString} into a ByteArray!"
+      )
+    }
+  }
+
+  def registerSerdes(): Unit = {
+    registeredSerdeInitFunctions.foreach(_._2.apply())
+  }
+
+  /**
+    * Register serdes for all complex types in extensions-core.
+    */
+  def initializeDefaults(shouldCompact: Boolean = false): Unit = {
+    knownTypes.foreach(_._2.apply(shouldCompact))
+  }
+
+  private val knownTypes: Map[String, Boolean => Unit] = Map[String, Boolean => Unit](
+    // Approximate Histograms
+    "approximateHistogram" -> ((_: Boolean) =>
+      register(
+        "approximateHistogram",
+        () => ApproximateHistogramDruidModule.registerSerde(),
+        classOf[ApproximateHistogram],
+        histogram => histogram.asInstanceOf[ApproximateHistogram].toBytes
+      )),
+    // Fixed Bucket Histograms
+    FixedBucketsHistogramAggregator.TYPE_NAME -> ((_: Boolean) =>
+        register(
+        FixedBucketsHistogramAggregator.TYPE_NAME,
+        () => ApproximateHistogramDruidModule.registerSerde(),
+        classOf[FixedBucketsHistogram],
+        histogram => histogram.asInstanceOf[FixedBucketsHistogram].toBytes
+      )),
+    // Tuple Sketches
+    ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH -> ((_: Boolean) =>
+      register(
+          ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH,
+          // TODO: This probably needs to be wrapped in a try to ensure it only happens once
+          () => new ArrayOfDoublesSketchModule().configure(null), // scalastyle:ignore null
+          classOf[ArrayOfDoublesSketch],
+          sketch => sketch.asInstanceOf[ArrayOfDoublesSketch].toByteArray
+      )),
+    // Quantiles Sketches
+    DoublesSketchModule.DOUBLES_SKETCH -> ((shouldCompact: Boolean) =>
+      register(
+        DoublesSketchModule.DOUBLES_SKETCH,
+        () => DoublesSketchModule.registerSerde(),
+        classOf[DoublesSketch],
+        sketch => {
+          val doublesSketch = sketch.asInstanceOf[DoublesSketch]
+          if (shouldCompact) doublesSketch.toByteArray(shouldCompact) else doublesSketch.toByteArray
+        }
+      )),
+    // HLL Sketches
+    HllSketchModule.TYPE_NAME -> ((shouldCompact: Boolean) =>
+      register(
+        HllSketchModule.TYPE_NAME,
+        () => HllSketchModule.registerSerde(),
+        classOf[HllSketch],
+        sketch => {
+          val hllSketch = sketch.asInstanceOf[HllSketch]
+          if (shouldCompact) hllSketch.toCompactByteArray else hllSketch.toUpdatableByteArray
+        }
+      )),
+    // Theta Sketches
+    SketchModule.THETA_SKETCH -> ((shouldCompact: Boolean) =>
+      register(
+        SketchModule.THETA_SKETCH,
+        () => SketchModule.registerSerde(),
+        classOf[SketchHolder],
+        sketch => {
+          val thetaSketch = sketch.asInstanceOf[SketchHolder].getSketch
+          if (shouldCompact) thetaSketch.compact().toByteArray else thetaSketch.toByteArray
+        }
+      )),
+    SketchModule.THETA_SKETCH_BUILD_AGG -> ((shouldCompact: Boolean) =>
+      register(
+        SketchModule.THETA_SKETCH_BUILD_AGG,
+        () => SketchModule.registerSerde(),
+        classOf[SketchHolder], // TODO: Maybe?
+        sketch => {
+          val thetaSketch = sketch.asInstanceOf[SketchHolder].getSketch
+          if (shouldCompact) thetaSketch.compact().toByteArray else thetaSketch.toByteArray
+        }
+      )),
+    SketchModule.THETA_SKETCH_MERGE_AGG -> ((shouldCompact: Boolean) =>
+      register(
+        SketchModule.THETA_SKETCH_MERGE_AGG,
+        () => SketchModule.registerSerde(),
+        classOf[SketchHolder], // TODO: Maybe?
+        sketch => {
+          val thetaSketch = sketch.asInstanceOf[SketchHolder].getSketch
+          if (shouldCompact) thetaSketch.compact().toByteArray else thetaSketch.toByteArray
+        }
+      )),
+    // Variance
+    "variance" -> ((_: Boolean) =>
+      register(
+        "variance",
+        () => ComplexMetrics.registerSerde("variance", new VarianceSerde()),
+        classOf[VarianceAggregatorCollector],
+        collector => collector.asInstanceOf[VarianceAggregatorCollector].toByteArray
+      ))
+  )
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/registries/DynamicConfigProviderRegistry.scala b/spark/src/main/scala/org/apache/druid/spark/registries/DynamicConfigProviderRegistry.scala
new file mode 100644
index 0000000..b333481
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/registries/DynamicConfigProviderRegistry.scala
@@ -0,0 +1,43 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.registries

+

+import com.fasterxml.jackson.databind.jsontype.NamedType

+import org.apache.druid.metadata.DynamicConfigProvider

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.mixins.Logging

+

+/**

+  * A registry for dynamic config providers. Similarly to the {@link AggregatorFactoryRegistry}, we can shadow the usual

+  * Druid pattern and let Jackson handle the polymorphism for our current use cases.

+  */

+object DynamicConfigProviderRegistry extends Logging {

+  /**

+    * Register a dynamic config provider with the given name. NAME must match the Jackson sub-type for PROVIDER.

+    *

+    * @param name The Jackson subtype for PROVIDER

+    * @param provider An implementation of DynamicConfigProvider to use when deserializing sensitive config values.

+    */

+  def register(name: String, provider: DynamicConfigProvider[_]): Unit = {

+    logInfo(s"Registering DynamicConfigProvider $name.")

+    // Cheat

+    MAPPER.registerSubtypes(new NamedType(provider.getClass, name))

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/registries/SQLConnectorRegistry.scala b/spark/src/main/scala/org/apache/druid/spark/registries/SQLConnectorRegistry.scala
new file mode 100644
index 0000000..53a3559
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/registries/SQLConnectorRegistry.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.registries
+
+import com.google.common.base.Supplier
+import org.apache.druid.java.util.common.IAE
+import org.apache.druid.metadata.storage.derby.{DerbyConnector, DerbyMetadataStorage}
+import org.apache.druid.metadata.storage.mysql.{MySQLConnector, MySQLConnectorDriverConfig,
+  MySQLConnectorSslConfig}
+import org.apache.druid.metadata.storage.postgresql.{PostgreSQLConnector,
+  PostgreSQLConnectorConfig, PostgreSQLTablesConfig}
+import org.apache.druid.metadata.{MetadataStorageConnectorConfig, MetadataStorageTablesConfig,
+  SQLMetadataConnector}
+import org.apache.druid.spark.mixins.Logging
+
+import scala.collection.mutable
+
+/**
+ * A registry for plugging in support for connectors to Druid metadata servers. Supports mysql, postgres, and derby out
+ * of the box.
+ */
+object SQLConnectorRegistry extends Logging {
+  private val registeredSQLConnectorFunctions:mutable.HashMap[String,
+    (Supplier[MetadataStorageConnectorConfig], Supplier[MetadataStorageTablesConfig]) =>
+      SQLMetadataConnector] =
+    new mutable.HashMap()
+
+  /**
+    * Register the provided creation function for the metadata server type SQLCONNECTORTYPE. This function should take
+    * two arguments, a `Supplier[MetadataStorageConnectorConfig]` and a `Supplier[MetadataStorageTablesConfig]`. These
+    * configs are parsed from the `metadata.*` properties specified when calling read() or .write() on a dataframe.
+    *
+    * @param sqlConnectorType The SQL database type to create a connector for.
+    * @param createFunc The function to use to create a SQLMetadataConnector for SQLCONNECTORTYPE.
+    */
+  def register(sqlConnectorType: String,
+               createFunc:
+               (Supplier[MetadataStorageConnectorConfig], Supplier[MetadataStorageTablesConfig]) =>
+                 SQLMetadataConnector): Unit = {
+    logInfo(s"Registering creation functions for SQL Connector $sqlConnectorType.")
+    registeredSQLConnectorFunctions(sqlConnectorType) = createFunc
+  }
+
+  /**
+    * Register the known SQLMetadataConnector for the given SQLCONNECTORTYPE. Known SQL Connector types and the
+    * corresponding creator functions are defined in `SQLConnectorRegistry.knownTypes`.
+    *
+    * @param sqlConnectorType The known SQL Connector type to register a bundled creation function for.
+    */
+  def registerByType(sqlConnectorType: String): Unit = {
+    if (!registeredSQLConnectorFunctions.contains(sqlConnectorType)
+      && knownTypes.contains(sqlConnectorType)) {
+      register(sqlConnectorType, knownTypes(sqlConnectorType))
+    }
+  }
+
+  /**
+    * Return a SQLMetadataConnector using the creation function registered for SQLCONNECTORTYPE. SQLCONNECTORTYPE must
+    * have either already been registered via `register(sqlConnectorType, ...)` or must be a known type.
+    *
+    * @param sqlConnectorType The SQL database type to create a Connector for.
+    * @param connectorConfigSupplier The supplier for the connector config used to configure the returned connector.
+    * @param metadataTableConfigSupplier The supplier for the metadata table config used to configure the returned
+    *                                    connector.
+    * @return A SQLMetadataConnector capable of querying an instance of a metadata server database of type
+    *         SQLCONNECTORTYPE, configured according to the provided config suppliers.
+    */
+  def create(
+              sqlConnectorType: String,
+              connectorConfigSupplier: Supplier[MetadataStorageConnectorConfig],
+              metadataTableConfigSupplier: Supplier[MetadataStorageTablesConfig]
+            ): SQLMetadataConnector = {
+    if (!registeredSQLConnectorFunctions.contains(sqlConnectorType)) {
+      if (knownTypes.contains(sqlConnectorType)) {
+        registerByType(sqlConnectorType)
+      } else {
+        throw new IAE("Unrecognized metadata DB type %s", sqlConnectorType)
+      }
+    }
+    registeredSQLConnectorFunctions(sqlConnectorType)(
+      connectorConfigSupplier, metadataTableConfigSupplier)
+  }
+
+  private val knownTypes: Map[String,
+    (Supplier[MetadataStorageConnectorConfig], Supplier[MetadataStorageTablesConfig]) =>
+      SQLMetadataConnector] =
+    Map[String, (Supplier[MetadataStorageConnectorConfig], Supplier[MetadataStorageTablesConfig]) =>
+      SQLMetadataConnector](
+      "mysql" ->
+        ((connectorConfigSupplier: Supplier[MetadataStorageConnectorConfig],
+          metadataTableConfigSupplier: Supplier[MetadataStorageTablesConfig]) =>
+          new MySQLConnector(
+            connectorConfigSupplier,
+            metadataTableConfigSupplier,
+            new MySQLConnectorSslConfig,
+            new MySQLConnectorDriverConfig)
+          ),
+      "postgres" -> ((connectorConfigSupplier: Supplier[MetadataStorageConnectorConfig],
+                       metadataTableConfigSupplier: Supplier[MetadataStorageTablesConfig]) =>
+        new PostgreSQLConnector(
+          connectorConfigSupplier,
+          metadataTableConfigSupplier,
+          new PostgreSQLConnectorConfig,
+          new PostgreSQLTablesConfig)
+        ),
+      "derby" -> ((connectorConfigSupplier: Supplier[MetadataStorageConnectorConfig],
+                   metadataTableConfigSupplier: Supplier[MetadataStorageTablesConfig]) =>
+        new DerbyConnector(
+          new DerbyMetadataStorage(connectorConfigSupplier.get()),
+          connectorConfigSupplier,
+          metadataTableConfigSupplier)
+        )
+    )
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/registries/SegmentReaderRegistry.scala b/spark/src/main/scala/org/apache/druid/spark/registries/SegmentReaderRegistry.scala
new file mode 100644
index 0000000..1cbf07d
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/registries/SegmentReaderRegistry.scala
@@ -0,0 +1,285 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.registries

+

+import com.fasterxml.jackson.core.`type`.TypeReference

+import com.fasterxml.jackson.databind.jsontype.NamedType

+import org.apache.druid.guice.LocalDataStorageDruidModule

+import org.apache.druid.java.util.common.{IAE, ISE, StringUtils}

+import org.apache.druid.segment.loading.{LoadSpec, LocalDataSegmentPuller, LocalLoadSpec}

+import org.apache.druid.spark

+import org.apache.druid.spark.{MAPPER, baseInjectableValues}

+import org.apache.druid.spark.configuration.Configuration

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.utils.DeepStorageConstructorHelpers

+import org.apache.druid.storage.azure.{AzureByteSource, AzureByteSourceFactory,

+  AzureDataSegmentPuller, AzureLoadSpec}

+import org.apache.druid.storage.google.{GoogleDataSegmentPuller, GoogleLoadSpec,

+  GoogleStorageDruidModule}

+import org.apache.druid.storage.hdfs.{HdfsDataSegmentPuller, HdfsLoadSpec}

+import org.apache.druid.storage.s3.{S3DataSegmentPuller, S3LoadSpec, S3StorageDruidModule}

+import org.apache.druid.utils.CompressionUtils

+import org.apache.hadoop.conf.{Configuration => HConf}

+import org.apache.hadoop.fs.Path

+

+import java.io.{File, IOException}

+import java.net.{URI, URISyntaxException}

+import java.util.{Map => JMap}

+import scala.collection.mutable

+

+/**

+  * A registry for functions to parse a "load spec" and load them into a provided file on an executor. `loadSpecType`

+  * must match the LoadSpec's type name exactly.

+  *

+  * Users can also register initializers if necessary to set up injections and register Jackson subtypes. This

+  * allows easier integration with deep storage types that aren't supported out

+  * of the box, since an initializer function can just register the LoadSpec subtype used to

+  * create a segment and allow Jackson to handle the rest. If custom logic is needed, a

+  * registered load function will always take precedence.

+  *

+  * Note that DataSegment#getLoadSpec returns a Map<String, Object>, not an actual LoadSpec object.

+  */

+object SegmentReaderRegistry extends Logging {

+  private val registeredSegmentLoaderFunctions: mutable.HashMap[String, (JMap[String, AnyRef], File) => Unit] =

+    new mutable.HashMap()

+  private val registeredInitializers: mutable.HashMap[String, (Configuration => Unit, Boolean)] =

+    new mutable.HashMap()

+

+  /**

+    * Register functions to extract URIs from segment LoadSpecs. Functions should take a loadSpec (i.e. a Java Map from

+    * String to AnyRef) and a destination file and pull the corresponding segment from deep storage to the file.

+    *

+    * @param loadSpecType The load spec type to register a load function for. Must match the value for the key

+    *                     `loadSpecType` in the loadSpec map.

+    * @param loadFunc A function that takes as its input a Java Map<String, Object> and a destination file and loads

+    *                 the corresponding segment on deep storage to the file.

+    */

+  def registerLoadFunction(loadSpecType: String, loadFunc: (JMap[String, AnyRef], File) => Unit): Unit = {

+    logInfo(s"Registering load function for deep storage type $loadSpecType.")

+    registeredSegmentLoaderFunctions(loadSpecType) = loadFunc

+  }

+

+  def registerInitializer(loadSpecType: String, initializeFunc: Configuration => Unit): Unit = {

+    logInfo(s"Registering initializer for deep storage type $loadSpecType.")

+    registeredInitializers(loadSpecType) = (initializeFunc, false)

+  }

+

+  /**

+    * Registers the default initializer function for DEEPSTORAGETYPE if one exists. This is a no-op

+    * if there is no defined default initializer for DEEPSTORAGETYPE. Note as well that deep

+    * storage type names may differ from Load Spec type names. In particular, the LoadSpec type

+    * name for s3 deep storage is s3_zip.

+    *

+    * @param deepStorageType The deep storage type to register an initializer for.

+    */

+  def registerInitializerByType(deepStorageType: String): Unit = {

+    if (!registeredInitializers.contains(deepStorageType)

+      && knownInitializers.contains(deepStorageType)) {

+      registerInitializer(deepStorageType, knownInitializers(deepStorageType))

+    }

+  }

+

+  /**

+    * Loads a segment according to the details in LOADSPEC to FILE. The rules for determining how

+    * to load a segment are:

+    *

+    * 1. If no segment loader function or initializer has been registered, we attempt to construct

+    *    a URI from LOADSPEC and then read that URI using FS. If the provided CONF is the Hadoop

+    *    Configuration retrieved from SparkContext, we can defer all deep storage configuration

+    *    and authorization to what the Spark cluster provides. Local, S3, HDFS, and GCS deep

+    *    storages are supported.

+    *

+    * 2. If at least one segment loader function or initializer has been registered but no loader

+    *    function has been registered for LOADSPEC's type, we delegate to Jackson to deserialize

+    *    LOADSPEC into a LoadSpec object and then call #loadSegment(FILE) on the deserialized object.

+    *    This requires LOADSPEC's type to have been registered with Jackson.

+    *

+    * 3. If we have registered a segment loader function for LOADSPEC's type, we use the registered

+    *    function to load the segment into FILE. A segment loader function always takes precedence

+    *    for its associated load spec type.

+    *

+    * @param loadSpec The LoadSpec for a segment.

+    * @param file The file to load a segment to according to the properties in LOADSPEC.

+    * @param conf The Hadoop configuration to use when reading segments from deep storage

+    *             if no segment loader function or initializer is registered.

+    */

+  def load(loadSpec: JMap[String, AnyRef], file: File, conf: HConf): Unit = {

+    if (registeredSegmentLoaderFunctions.isEmpty && registeredInitializers.isEmpty) {

+      defaultLoad(loadSpec, file, conf)

+    } else {

+      val loadSpecType = loadSpec.get("type").toString

+      if (!registeredSegmentLoaderFunctions.contains(loadSpecType)) {

+        try {

+          deserializeAndLoad(loadSpec, file)

+        } catch {

+          case e: Exception =>

+            logError(s"Unable to deserialize ${MAPPER.writeValueAsString(loadSpec)} to a LoadSpec instance!", e)

+            throw new IAE("No registered segment reader function or named LoadSpec subtype for load spec type %s",

+              loadSpecType)

+        }

+      } else {

+        registeredSegmentLoaderFunctions(loadSpecType)(loadSpec, file)

+      }

+    }

+  }

+

+  /**

+    * Initializes a SegmentPuller for DEEPSTORAGETYPE based on CONF. CONF should have the deep storage

+    * type prefix stripped away via .dive(DEEPSTORAGETYPE) to keep the extra object small.

+    *

+    * @param deepStorageType The deep storage type to initialize.

+    * @param conf A Configuration object to provide user-supplied deep storage configuration properties.

+    */

+  def initialize(deepStorageType: String, conf: Configuration): Unit = {

+    if (!registeredInitializers.contains(deepStorageType)) {

+      if (knownInitializers.keySet.contains(deepStorageType)) {

+        registerInitializerByType(deepStorageType)

+      } else {

+        throw new IAE("No registered initializer for deep storage type %s", deepStorageType)

+      }

+    }

+    /*

+     * This is synchronized to allow callers to do something like

+     *   df.foreachPartition{_ => SegmentReaderRegistry.initialize("myType", conf)}

+     *

+     * The initialization functions themselves can be registered idempotently and so don't need to

+     * be synchronized but should not be registered in the same .foreachPartition call or similar

+     * (otherwise, each partition would reset the initialized flag and thus invoke initFunc multiple

+     * times per executor).

+     */

+    registeredInitializers.synchronized{

+      val (initFunc, init) = registeredInitializers(deepStorageType)

+      if (!init) {

+        initFunc(conf)

+        registeredInitializers(deepStorageType) = (initFunc, true)

+      }

+    }

+  }

+

+  /**

+    * A default load method adapted from JobHelper#getURIFromSegment. Loads a segment according to

+    * LOADSPEC to FILE. This method assumes that any necessary authentication will be handled at

+    * the machine instance and so needs no configuration. Additionally, this method requires the

+    * target segment to load to be available at a URI constructable from LOADSPEC and so only

+    * local, hdfs, gs, and s3 deep storages are supported.

+    *

+    * @param loadSpec The loadspec that describes where the segment to load should be read from.

+    * @param file The file to read a segment into.

+    * @param conf The Hadoop Configuration to use when constructing a filesystem to open the URI created from LOADSPEC.

+    */

+  private def defaultLoad(loadSpec: JMap[String, AnyRef], file: File, conf: HConf): Unit = {

+    val loadSpecType = loadSpec.get("type").toString

+    val uri = loadSpecType match {

+      case LocalDataStorageDruidModule.SCHEME =>

+        try {

+          // scalastyle:off null

+          new URI("file", null, loadSpec.get("path").toString, null, null)

+          // scalastyle:on

+        }

+        catch {

+          case e: URISyntaxException =>

+            throw new ISE(e, "Unable to form simple file uri")

+        }

+      case "hdfs" => URI.create(loadSpec.get("path").toString)

+      case GoogleStorageDruidModule.SCHEME =>

+        // Segment names contain : in their path.

+        // Google Cloud Storage supports : but Hadoop does not.

+        // This becomes an issue when re-indexing using the current segments.

+        // The Hadoop getSplits code doesn't understand the : and returns "Relative path in absolute URI"

+        // This could be fixed using the same code that generates path names for hdfs segments using

+        // getHdfsStorageDir. But that wouldn't fix this issue for people who already have segments with ":".

+        // Because of this we just URL encode the : making everything work as it should.

+        URI.create(StringUtils.format("gs://%s/%s", loadSpec.get("bucket"),

+          StringUtils.replaceChar(loadSpec.get("path").toString, ':', "%3A")))

+      case S3StorageDruidModule.SCHEME_S3_ZIP =>

+        if ("s3a" == loadSpec.get("S3Schema")) {

+          URI.create(StringUtils.format("s3a://%s/%s", loadSpec.get("bucket"),

+            loadSpec.get("key")))

+        } else {

+          URI.create(StringUtils.format("s3n://%s/%s", loadSpec.get("bucket"),

+            loadSpec.get("key")))

+

+        }

+    }

+

+    val path = new Path(uri)

+    val fs = path.getFileSystem(conf)

+    try {

+      CompressionUtils.unzip(fs.open(path), file)

+    } catch {

+      case exception@(_: IOException | _: RuntimeException) =>

+        logError(s"Exception unzipping $path!", exception)

+        throw exception

+    }

+  }

+

+  private val knownInitializers: Map[String, Configuration => Unit] =

+    Map[String, Configuration => Unit](

+      LocalDataStorageDruidModule.SCHEME -> (_ => {

+        val puller = new LocalDataSegmentPuller()

+        val injectableValues = baseInjectableValues

+          .addValue(classOf[LocalDataSegmentPuller], puller)

+        MAPPER.setInjectableValues(injectableValues)

+        MAPPER.registerSubtypes(classOf[LocalLoadSpec])

+      }),

+      "hdfs" -> ((conf: Configuration) => {

+        val hadoopConfiguration = DeepStorageConstructorHelpers.createHadoopConfiguration(conf)

+        val puller = new HdfsDataSegmentPuller(hadoopConfiguration)

+        val injectableValues = baseInjectableValues

+          .addValue(classOf[HdfsDataSegmentPuller], puller)

+        MAPPER.setInjectableValues(injectableValues)

+        MAPPER.registerSubtypes(new NamedType(classOf[HdfsLoadSpec], "hdfs"))

+      }),

+      GoogleStorageDruidModule.SCHEME -> (_ => {

+        val googleStorage = DeepStorageConstructorHelpers.createGoogleStorage()

+        val puller = new GoogleDataSegmentPuller(googleStorage)

+        val injectableValues = baseInjectableValues

+          .addValue(classOf[GoogleDataSegmentPuller], puller)

+        MAPPER.setInjectableValues(injectableValues)

+        MAPPER.registerSubtypes(classOf[GoogleLoadSpec])

+      }),

+      "s3" -> ((conf: Configuration) => {

+        val s3 = DeepStorageConstructorHelpers.createServerSideEncryptingAmazonS3(conf)

+        val puller = new S3DataSegmentPuller(s3)

+        val injectableValues = baseInjectableValues

+          .addValue(classOf[S3DataSegmentPuller], puller)

+        MAPPER.setInjectableValues(injectableValues)

+        MAPPER.registerSubtypes(classOf[S3LoadSpec])

+      }),

+      "azure" -> ((conf: Configuration) => {

+        val azureStorage = DeepStorageConstructorHelpers.createAzureStorage(conf)

+        val azureByteSourceFactory = new AzureByteSourceFactory {

+          override def create(containerName: String, blobPath: String): AzureByteSource = {

+            new AzureByteSource(azureStorage, containerName, blobPath)

+          }

+        }

+        val puller = new AzureDataSegmentPuller(azureByteSourceFactory)

+        val injectableValues = baseInjectableValues

+          .addValue(classOf[AzureDataSegmentPuller], puller)

+        MAPPER.setInjectableValues(injectableValues)

+        MAPPER.registerSubtypes(classOf[AzureLoadSpec])

+      })

+    )

+

+  private val deserializeAndLoad = (loadSpec: JMap[String, AnyRef], file: File) =>  {

+    val loadSpecStr = MAPPER.writeValueAsString(loadSpec)

+    MAPPER.readValue[LoadSpec](loadSpecStr, new TypeReference[LoadSpec] {}).loadSegment(file)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala b/spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala
new file mode 100644
index 0000000..ed3125b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala
@@ -0,0 +1,236 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import com.fasterxml.jackson.databind.MapperFeature

+import com.fasterxml.jackson.databind.introspect.AnnotatedClass

+import com.fasterxml.jackson.module.scala.DefaultScalaModule

+import org.apache.druid.common.aws.{AWSClientConfig, AWSCredentialsConfig, AWSEndpointConfig,

+  AWSModule, AWSProxyConfig}

+import org.apache.druid.common.gcp.GcpModule

+import org.apache.druid.java.util.common.StringUtils

+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+import org.apache.druid.spark.mixins.TryWithResources

+import org.apache.druid.storage.azure.{AzureAccountConfig, AzureDataSegmentConfig,

+  AzureInputDataConfig, AzureStorage, AzureStorageDruidModule}

+import org.apache.druid.storage.google.{GoogleAccountConfig, GoogleInputDataConfig, GoogleStorage,

+  GoogleStorageDruidModule}

+import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig

+import org.apache.druid.storage.s3.{NoopServerSideEncryption, S3DataSegmentPusherConfig,

+  S3InputDataConfig, S3SSECustomConfig, S3SSEKmsConfig, S3StorageConfig, S3StorageDruidModule,

+  ServerSideEncryptingAmazonS3, ServerSideEncryption}

+import org.apache.hadoop.conf.{Configuration => HConf}

+

+import java.io.{ByteArrayInputStream, DataInputStream}

+import scala.collection.JavaConverters.collectionAsScalaIterableConverter

+

+object DeepStorageConstructorHelpers extends TryWithResources {

+  /*

+   * Spark DataSourceOption property maps are case insensitive, by which they mean they lower-case all keys. Since all

+   * our user-provided property keys will come to us via a DataSourceOption, we need to use a case-insensisitive jackson

+   * mapper to deserialize property maps into objects. We want to be case-aware in the rest of our code, so we create a

+   * private, case-insensitive copy of our mapper here.

+   */

+  private val caseInsensitiveMapper = MAPPER.copy()

+    .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)

+    .registerModule(DefaultScalaModule)

+

+  // Local Storage Helpers

+

+  def createLocalDataSegmentPusherConfig(conf: Configuration): LocalDataSegmentPusherConfig = {

+    convertConfToInstance(conf, classOf[LocalDataSegmentPusherConfig])

+  }

+

+  // HDFS Storage Helpers

+

+  def createHdfsDataSegmentPusherConfig(conf: Configuration): HdfsDataSegmentPusherConfig = {

+    convertConfToInstance(conf, classOf[HdfsDataSegmentPusherConfig])

+  }

+

+  def createHadoopConfiguration(conf: Configuration): HConf = {

+    val hadoopConf = new HConf()

+    val confByteStream = new ByteArrayInputStream(

+      StringUtils.decodeBase64String(conf.getString(DruidConfigurationKeys.hdfsHadoopConfKey))

+    )

+    tryWithResources(confByteStream, new DataInputStream(confByteStream)){

+      case (_, inputStream: DataInputStream) => hadoopConf.readFields(inputStream)

+    }

+    hadoopConf

+  }

+

+  // S3 Storage Helpers

+

+  /**

+    * Create an S3DataSegmentPusherConfig from the relevant properties in CONF.

+    *

+    * *** Note that we explicitly override the default for `useS3aSchema`! ***

+    * Almost all users will want to use s3a, not s3n, and we have no backwards-compatibility to maintain.

+    *

+    * @param conf The Configuration object specifying the S3DataSegmentPusherConfig to create.

+    * @return An S3DataSegmentPusherConfig derived from the properties specified in CONF.

+    */

+  def createS3DataSegmentPusherConfig(conf: Configuration): S3DataSegmentPusherConfig = {

+    if (!conf.isPresent(DruidConfigurationKeys.s3UseS3ASchemaKey)) {

+      convertConfToInstance(conf.merge(

+        Configuration.fromKeyValue(DruidConfigurationKeys.s3UseS3ASchemaKey, "true")

+      ), classOf[S3DataSegmentPusherConfig])

+    } else {

+      convertConfToInstance(conf, classOf[S3DataSegmentPusherConfig])

+    }

+  }

+

+  def createS3InputDataConfig(conf: Configuration): S3InputDataConfig = {

+    convertConfToInstance(conf, classOf[S3InputDataConfig])

+  }

+

+  def createServerSideEncryptingAmazonS3(conf: Configuration): ServerSideEncryptingAmazonS3 = {

+    val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig) =

+      createConfigsForServerSideEncryptingAmazonS3(conf)

+

+    val awsModule = new AWSModule

+    val s3Module = new S3StorageDruidModule

+    val credentialsProvider = awsModule.getAWSCredentialsProvider(credentialsConfig)

+    s3Module.getAmazonS3Client(

+      s3Module.getServerSideEncryptingAmazonS3Builder(

+        credentialsProvider,

+        proxyConfig,

+        endpointConfig,

+        clientConfig,

+        s3StorageConfig

+      )

+    )

+  }

+

+  def createConfigsForServerSideEncryptingAmazonS3(conf: Configuration):

+  (AWSCredentialsConfig, AWSProxyConfig, AWSEndpointConfig, AWSClientConfig, S3StorageConfig) = {

+    val credentialsConfig = convertConfToInstance(conf, classOf[AWSCredentialsConfig])

+

+    val proxyConfig = convertConfToInstance(conf.dive("proxy"), classOf[AWSProxyConfig])

+

+    val endpointConfig = convertConfToInstance(conf.dive("endpoint"), classOf[AWSEndpointConfig])

+

+    val clientConfig = convertConfToInstance(conf.dive("client"), classOf[AWSClientConfig])

+

+    val s3StorageConfig = createS3StorageConfig(conf.dive(DruidConfigurationKeys.s3ServerSideEncryptionPrefix))

+    (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig)

+  }

+

+  /**

+    * A helper method for creating instances of S3StorageConfigs from a Configuration. While I'm sure there's a simple

+    * solution I'm missing, I would have thought that something like the following would have worked:

+    *

+    * ```

+    * val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])

+    * caseInsensitiveMapper.setInjectableValues(new InjectableValues.Std().addValue(classOf[S3SSEKmsConfig], kmsConfig))

+    * val ser = caseInsensitiveMapper.writeValueAsString(Map[String, String]("type" -> "kms"))

+    * caseInsensitiveMapper.readValue[ServerSideEncryption](ser, new TypeReference[ServerSideEncryption] {})

+    * ```

+    *

+    * However, the code above throws an com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Invalid

+    * definition for property `config` (of type `org.apache.druid.storage.s3.KmsServerSideEncryption`): Could not find

+    * creator property with name 'config' (known Creator properties: [])

+    *

+    * I _think_ that the root cause is that ServerSideEncryption is abstract, but the error message above isn't

+    * what I would expect. Nevertheless, the simple solution would be to serialize to a KmsServerSideEncryption

+    * instance and then cast to the base ServerSideEncryption to assign. Unfortunately, KmsServerSideEncryption

+    * is package-private, so we can't access the class here. Since we already have the config object and we

+    * need to muck about with field visibility, we take the shortcut and just make the constructor accessible. This

+    * solution generalizes to the CustomServerSideEncyption case as well.

+    */

+  def createS3StorageConfig(conf: Configuration): S3StorageConfig = {

+    // There's probably a more elegant way to do this that would allow us to transparently support new sse types, but

+    // this will work for now.

+    val sseType = conf.get(DruidConfigurationKeys.s3ServerSideEncryptionTypeKey)

+

+    // Getting the list of subtypes since we'll need to use it to grab references to the package-private implementations

+    val config = caseInsensitiveMapper.getDeserializationConfig

+    val ac = AnnotatedClass.constructWithoutSuperTypes(classOf[ServerSideEncryption], config)

+    val subtypes = caseInsensitiveMapper.getSubtypeResolver.collectAndResolveSubtypesByClass(config, ac)

+

+    val serverSideEncryption: ServerSideEncryption = sseType match {

+      case Some("s3") =>

+        val clazz = subtypes.asScala.filter(_.getName == "s3").head.getType

+        val constructor = clazz.getDeclaredConstructor()

+        constructor.setAccessible(true)

+        constructor.newInstance().asInstanceOf[ServerSideEncryption]

+      case Some("kms") =>

+        val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])

+        val clazz = subtypes.asScala.filter(_.getName == "kms").head.getType

+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSEKmsConfig])

+        constructor.setAccessible(true)

+        constructor.newInstance(kmsConfig).asInstanceOf[ServerSideEncryption]

+      case Some("custom") =>

+        val customConfig = convertConfToInstance(conf.dive("custom"), classOf[S3SSECustomConfig])

+        val clazz = subtypes.asScala.filter(_.getName == "custom").head.getType

+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSECustomConfig])

+        constructor.setAccessible(true)

+        constructor.newInstance(customConfig).asInstanceOf[ServerSideEncryption]

+      case _ => new NoopServerSideEncryption

+    }

+    new S3StorageConfig(serverSideEncryption)

+  }

+

+  // GCS Storage Helpers

+

+  def createGoogleAcountConfig(conf: Configuration): GoogleAccountConfig = {

+    convertConfToInstance(conf, classOf[GoogleAccountConfig])

+  }

+

+  def createGoogleInputDataConfig(conf: Configuration): GoogleInputDataConfig = {

+    convertConfToInstance(conf, classOf[GoogleInputDataConfig])

+  }

+

+  def createGoogleStorage(): GoogleStorage = {

+    val gcpModule = new GcpModule

+    val gcpStorageModule = new GoogleStorageDruidModule

+

+    val httpTransport = gcpModule.getHttpTransport

+    val jsonFactory = gcpModule.getJsonFactory

+    val requestInitializer = gcpModule.getHttpRequestInitializer(httpTransport, jsonFactory)

+    gcpStorageModule.getGoogleStorage(httpTransport, jsonFactory, requestInitializer)

+  }

+

+  // Azure Storage Helpers

+

+  def createAzureDataSegmentConfig(conf: Configuration): AzureDataSegmentConfig = {

+    convertConfToInstance(conf, classOf[AzureDataSegmentConfig])

+  }

+

+  def createAzureInputDataConfig(conf: Configuration): AzureInputDataConfig = {

+    convertConfToInstance(conf, classOf[AzureInputDataConfig])

+  }

+

+  def createAzureAccountConfig(conf: Configuration): AzureAccountConfig = {

+    convertConfToInstance(conf, classOf[AzureAccountConfig])

+  }

+

+  def createAzureStorage(conf: Configuration): AzureStorage = {

+    val accountConfig = convertConfToInstance(conf, classOf[AzureAccountConfig])

+    val azureModule = new AzureStorageDruidModule

+    val cloudBlobClient = azureModule.getCloudBlobClient(accountConfig)

+    azureModule.getAzureStorageContainer(cloudBlobClient)

+  }

+

+  private def convertConfToInstance[T](conf: Configuration, clazz: Class[T]): T = {

+    caseInsensitiveMapper.convertValue(conf.toMap, clazz)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala b/spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala
new file mode 100644
index 0000000..e6e195c
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala
@@ -0,0 +1,325 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.java.util.common.{ISE, JodaUtils}

+import org.apache.druid.query.filter.{AndDimFilter, BoundDimFilter, DimFilter, FalseDimFilter,

+  InDimFilter, NotDimFilter, OrDimFilter, RegexDimFilter, SelectorDimFilter}

+import org.apache.druid.query.ordering.{StringComparator, StringComparators}

+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan,

+  GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains,

+  StringEndsWith, StringStartsWith}

+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, IntegerType,

+  LongType, StringType, StructType, TimestampType}

+

+import scala.collection.JavaConverters.{seqAsJavaListConverter, setAsJavaSetConverter}

+

+/**

+  * Converters and utilities for working with Spark and Druid Filters.

+  */

+object FilterUtils {

+  /**

+    * Map an array of Spark filters FILTERS to a Druid dim filter or None if filters is empty.

+    *

+    * We return a DimFilter instead of a Filter and force callers to call .toFilter

+    * or .toOptimizedFilter to get a filter because callers can't covert back to a DimFilter from a

+    * Filter.

+    *

+    * @param filters The spark filters to map to a Druid filter.

+    * @return A Druid filter corresponding to the union of filter conditions enumerated in FILTERS.

+    */

+  def mapFilters(filters: Array[Filter], schema: StructType): Option[DimFilter] = {

+    if (filters.isEmpty) {

+      Option.empty[DimFilter]

+    } else {

+      Some(new AndDimFilter(filters.map(mapFilter(_, schema)).toList.asJava).optimize())

+    }

+  }

+

+  /**

+    * Convert a Spark-style filter FILTER to a Druid-style filter.

+    *

+    * @param filter The Spark filter to map to a Druid filter.

+    * @return The Druid filter corresponding to the filter condition described by FILTER.

+    */

+  def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length

+    // scalastyle:off null

+    filter match {

+      case And(left, right) =>

+        new AndDimFilter(List(mapFilter(left, schema), mapFilter(right, schema)).asJava)

+      case Or(left, right) =>

+        new OrDimFilter(List(mapFilter(left, schema), mapFilter(right, schema)).asJava)

+      case Not(condition) =>

+        new NotDimFilter(mapFilter(condition, schema))

+      case IsNull(field) =>

+        new SelectorDimFilter(field, null, null, null)

+      case IsNotNull(field) => new NotDimFilter(new SelectorDimFilter(field, null, null, null))

+      case In(field, values) =>

+        new InDimFilter(field, values.filter(_ != null).map(_.toString).toSet.asJava, null, null)

+      case StringContains(field, value) =>

+        // Not 100% sure what Spark's expectations are for regex, case insensitive, etc.

+        // and not sure the relative efficiency of various Druid dim filters

+        // Could also use a SearchQueryDimFilter here

+        // new LikeDimFilter(field, s"%$value%", null, null)

+        new RegexDimFilter(field, value, null, null)

+      case StringStartsWith(field, value) =>

+        // Not sure the trade-offs between LikeDimFilter and RegexDimFilter here

+        // new LikeDimFilter(field, s"$value%", null, null, null)

+        new RegexDimFilter(field, s"^$value", null, null)

+      case StringEndsWith(field, value) =>

+        // Not sure the trade-offs between LikeDimFilter and RegexDimFilter here

+        // new LikeDimFilter(field, s"%$value", null, null, null)

+        new RegexDimFilter(field, s"$value$$", null, null)

+      case EqualTo(field, value) =>

+        if (value == null) {

+          FalseDimFilter.instance()

+        } else {

+          new SelectorDimFilter(field, value.toString, null, null)

+        }

+      case EqualNullSafe(field, value) =>

+        new SelectorDimFilter(field, Option(value).map(_.toString).orNull, null, null)

+      case LessThan(field, value) =>

+        new BoundDimFilter(

+          field,

+          null,

+          value.toString,

+          false,

+          true,

+          null,

+          null,

+          getOrderingFromDataType(schema(field).dataType),

+          null

+        )

+      case LessThanOrEqual(field, value) =>

+        new BoundDimFilter(

+          field,

+          null,

+          value.toString,

+          false,

+          false,

+          null,

+          null,

+          getOrderingFromDataType(schema(field).dataType),

+          null

+        )

+      case GreaterThan(field, value) =>

+        new BoundDimFilter(

+          field,

+          value.toString,

+          null,

+          true,

+          false,

+          null,

+          null,

+          getOrderingFromDataType(schema(field).dataType),

+          null

+        )

+      case GreaterThanOrEqual(field, value) =>

+        new BoundDimFilter(

+          field,

+          value.toString,

+          null,

+          false,

+          false,

+          null,

+          null,

+          getOrderingFromDataType(schema(field).dataType),

+          null

+        )

+    }

+    // scalastyle:on

+  }

+

+  private[utils] def getOrderingFromDataType(dataType: DataType): StringComparator = {

+    dataType match {

+      case LongType | IntegerType | DoubleType | FloatType => StringComparators.NUMERIC

+      case StringType | ArrayType(StringType, _) => StringComparators.LEXICOGRAPHIC

+      // Filters on complex types should return false when evaluated in isSupportedFilter, something's gone wrong

+      case _ => throw new ISE("This reader doesn't support filtering on complex types! Complex type " +

+        "filters should not be pushed down.")

+    }

+  }

+

+  /**

+    * Given a Spark filter and a target DataFrame schema, returns true iff the filter can be pushed down to the Druid

+    * InputPartitionReaders. Since segments are pulled from deep storage before being filtered, this is not as useful

+    * as it could be but still saves time and resources.

+    *

+    * @param filter The filter to evaluate for support.

+    * @param schema The schema of the DataFrame to be filtered by FILTER.

+    * @return True iff FILTER can be pushed down to the InputPartitionReaders.

+    */

+  private[spark] def isSupportedFilter(

+                                        filter: Filter,

+                                        schema: StructType,

+                                        useSQLCompatibleNulls: Boolean = false

+                                      ): Boolean = {

+    // If the filter references columns we don't know about, we can't push it down

+    if (!filter.references.forall(schema.fieldNames.contains(_))) {

+      false

+    } else {

+      filter match {

+        // scalastyle:off null

+        case and: And => isSupportedFilter(and.left, schema, useSQLCompatibleNulls) &&

+          isSupportedFilter(and.right, schema, useSQLCompatibleNulls)

+        case or: Or => isSupportedFilter(or.left, schema, useSQLCompatibleNulls) &&

+          isSupportedFilter(or.right, schema, useSQLCompatibleNulls)

+        case not: Not => isSupportedFilter(not.child, schema, useSQLCompatibleNulls)

+        // If we're using SQL-compatible nulls, we can filter for null.

+        // Otherwise, callers should explictly filter for '' or 0 depending on the column type.

+        // If we ever support pushing down filters on complex types, we'll need to add handling here.

+        case _: IsNull => useSQLCompatibleNulls

+        case _: IsNotNull => useSQLCompatibleNulls

+        case in: In => checkAllDataTypesSupported(filter, schema) &&

+          (useSQLCompatibleNulls || !in.values.contains(null))

+        case _: StringContains => checkStringsOnly(filter, schema)

+        case _: StringStartsWith => checkStringsOnly(filter, schema)

+        case _: StringEndsWith => checkStringsOnly(filter, schema)

+        // Hopefully Spark is smart enough to short-circuit for foo = NULL queries but if not, I guess we can

+        case equalTo: EqualTo => checkAllDataTypesSupported(filter, schema) &&

+          (useSQLCompatibleNulls || equalTo.value != null)

+        case equalNullSafe: EqualNullSafe => checkAllDataTypesSupported(filter, schema) &&

+          (useSQLCompatibleNulls || equalNullSafe.value != null)

+        case _: LessThan => checkAllDataTypesSupported(filter, schema)

+        case _: LessThanOrEqual => checkAllDataTypesSupported(filter, schema)

+        case _: GreaterThan => checkAllDataTypesSupported(filter, schema)

+        case _: GreaterThanOrEqual => checkAllDataTypesSupported(filter, schema)

+        case _ => false

+        // scalastyle:on

+      }

+    }

+  }

+

+  private def checkAllDataTypesSupported(filter: Filter, schema: StructType): Boolean = {

+    filter.references.map{field =>

+      schema(schema.fieldIndex(field)).dataType

+    }.forall(supportedDataTypesForFiltering.contains(_))

+  }

+

+  private def checkStringsOnly(filter: Filter, schema: StructType): Boolean = {

+    filter.references.map{field =>

+      schema(schema.fieldIndex(field)).dataType == StringType

+    }.forall(identity)

+  }

+

+  private val supportedDataTypesForFiltering: Seq[DataType] = Seq(

+    IntegerType, LongType, FloatType, DoubleType, TimestampType, StringType

+  )

+

+  private val emptyBoundSeq = Seq.empty[(Bound, Long)]

+

+  /**

+    * Given an array of Spark filters, return upper and lower bounds on the value of the __time column if bounds

+    * can be determined.

+    *

+    * @param filters The array of filters to extract __time bounds from

+    * @return A tuple containing an optional lower and an optional upper bound on the __time column.

+    */

+  def getTimeFilterBounds(filters: Array[Filter]): (Option[Long], Option[Long]) = {

+    val timeFilters = filters

+      .filter(_.references.contains("__time"))

+      .flatMap(FilterUtils.decomposeTimeFilters)

+      .partition(_._1 == FilterUtils.LOWER)

+    (timeFilters._1.map(_._2).reduceOption(_ max _),

+      timeFilters._2.map(_._2).reduceOption(_ min _))

+  }

+

+  /**

+    * Decompose a Spark Filter into a sequence of bounds on the __time field if possible.

+    *

+    * @param filter The Spark filter to possibly extract time bounds from.

+    * @return A sequnce of tuples containing either UPPER or LOWER bounds on the __time field, in

+    *         epoch millis.

+    */

+  private[spark] def decomposeTimeFilters(filter: Filter): Seq[(Bound, Long)] = { // scalastyle:ignore method.length

+    filter match {

+      case And(left, right) =>

+        val bounds = Seq(left, right)

+          .filter(_.references.contains("__time"))

+          .flatMap(decomposeTimeFilters)

+          .partition(_._1 == LOWER)

+        val optBounds = (bounds._1.map(_._2).reduceOption(_ max _ ),

+          bounds._2.map(_._2).reduceOption(_ min _ ))

+        Seq[Option[(Bound, Long)]](

+          optBounds._1.fold(Option.empty[(Bound, Long)])(bound => Some((LOWER, bound))),

+          optBounds._2.fold(Option.empty[(Bound, Long)])(bound => Some((UPPER, bound)))

+        ).flatten

+      case Or(left, right) =>

+        val bounds = Seq(left, right)

+          .filter(_.references.contains("__time")).flatMap(decomposeTimeFilters)

+          .partition(_._1 == LOWER)

+        val optBounds = (bounds._1.map(_._2).reduceOption(_ min _ ),

+          bounds._2.map(_._2).reduceOption(_ max _ ))

+        Seq[Option[(Bound, Long)]](

+          optBounds._1.fold(Option.empty[(Bound, Long)])(bound => Some((LOWER, bound))),

+          optBounds._2.fold(Option.empty[(Bound, Long)])(bound => Some((UPPER, bound)))

+        ).flatten

+      case Not(condition) =>

+        if (condition.references.contains("__time")) {

+          // Our quick and dirty bounds enum doesn't handle nots, so just return an unbounded interval

+          Seq[(Bound, Long)](

+            (LOWER, JodaUtils.MIN_INSTANT),

+            (UPPER, JodaUtils.MAX_INSTANT)

+          )

+        } else {

+          emptyBoundSeq

+        }

+      case EqualTo(field, value) =>

+        if (field == "__time") {

+          Seq(

+            (LOWER, value.asInstanceOf[Long]),

+            (UPPER, value.asInstanceOf[Long])

+          )

+        } else {

+          emptyBoundSeq

+        }

+      case LessThan(field, value) =>

+        if (field == "__time") {

+          Seq((UPPER, value.asInstanceOf[Long] - 1))

+        } else {

+          emptyBoundSeq

+        }

+      case LessThanOrEqual(field, value) =>

+        if (field == "__time") {

+          Seq((UPPER, value.asInstanceOf[Long]))

+        } else {

+          emptyBoundSeq

+        }

+      case GreaterThan(field, value) =>

+        if (field == "__time") {

+          Seq((LOWER, value.asInstanceOf[Long] + 1))

+        } else {

+          emptyBoundSeq

+        }

+      case GreaterThanOrEqual(field, value) =>

+        if (field == "__time") {

+          Seq((LOWER, value.asInstanceOf[Long]))

+        } else {

+          emptyBoundSeq

+        }

+      case _ => emptyBoundSeq

+    }

+  }

+

+  private[spark] sealed trait Bound

+  case object LOWER extends Bound

+  case object UPPER extends Bound

+

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/utils/NullHandlingUtils.scala b/spark/src/main/scala/org/apache/druid/spark/utils/NullHandlingUtils.scala
new file mode 100644
index 0000000..63e9e5c
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/utils/NullHandlingUtils.scala
@@ -0,0 +1,40 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.common.config.{NullHandling, NullValueHandlingConfig}

+

+/**

+  * Utility class for initializing a Druid NullValueHandlingConfig. In a Druid cluster, this is handled via injection

+  * and in unit tests default value null handling is initialized via NullHandling.initializeForTests(), but we need

+  * to use reflection here to handle the case where we want to set SQL-compatible null handling.

+  */

+object NullHandlingUtils {

+  def initializeDruidNullHandling(useDruidDefaultHandling: Boolean): Unit = {

+    if (useDruidDefaultHandling) {

+      NullHandling.initializeForTests()

+    } else {

+      val nullHandlingConfig = new NullValueHandlingConfig(false)

+      val instanceField = classOf[NullHandling].getDeclaredField("INSTANCE")

+      instanceField.setAccessible(true)

+      instanceField.set(null, nullHandlingConfig) // scalastyle:ignore null

+    }

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/utils/PartitionReaderSegmentLoader.scala b/spark/src/main/scala/org/apache/druid/spark/utils/PartitionReaderSegmentLoader.scala
new file mode 100644
index 0000000..e4faaa2
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/utils/PartitionReaderSegmentLoader.scala
@@ -0,0 +1,76 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.java.util.common.{FileUtils, ISE, StringUtils}

+import org.apache.druid.segment.{IndexIO, QueryableIndexSegment, Segment, SegmentLazyLoadFailCallback}

+import org.apache.druid.segment.loading.SegmentLoader

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.registries.SegmentReaderRegistry

+import org.apache.druid.timeline.DataSegment

+import org.apache.hadoop.conf.{Configuration => HConf}

+

+import java.io.File

+import scala.collection.mutable.{HashSet => MHashSet}

+

+/**

+  * A SegmentLoader to manage loading segment files for a partition reader. For now, a segment loader is created per

+  * PartitionReader, which means it will only ever load a single segment. This is slightly wasteful, but avoids needing

+  * to manage the SegmentLoader's life cycle outside of a PartitionReader. If the input partition planning logic ever

+  * gets smarter than just assigning each segment to a partition, this design decision should be revisited.

+  */

+class PartitionReaderSegmentLoader(

+                                  val tmpDir: File,

+                                  val hadoopConf: HConf,

+                                  val indexIO: IndexIO

+                                  ) extends SegmentLoader with Logging {

+  private val loadedSegments = new MHashSet[DataSegment]

+

+  override def isSegmentLoaded(segment: DataSegment): Boolean = loadedSegments.contains(segment)

+

+  override def getSegment(segment: DataSegment, `lazy`: Boolean, loadFailed: SegmentLazyLoadFailCallback): Segment = {

+    val segmentFile = getSegmentFiles(segment)

+    val index = indexIO.loadIndex(segmentFile, `lazy`, loadFailed)

+    logInfo(s"Loaded segment [${segment.getId}].")

+    new QueryableIndexSegment(index, segment.getId)

+  }

+

+  override def getSegmentFiles(segment: DataSegment): File = {

+    val segmentDir = new File(tmpDir, segment.getId.toString)

+    if (!segmentDir.exists) {

+      logInfo(

+        StringUtils.format(

+          "Fetching segment [%s] to [%s].", segment.getId, segmentDir

+        )

+      )

+      if (!segmentDir.mkdir) throw new ISE("Failed to make directory[%s]", segmentDir)

+      SegmentReaderRegistry.load(segment.getLoadSpec, segmentDir, hadoopConf)

+      loadedSegments += segment

+    }

+    segmentDir

+  }

+

+  override def cleanup(segment: DataSegment): Unit = {

+    if (isSegmentLoaded(segment)) {

+      loadedSegments -= segment

+      FileUtils.deleteDirectory(new File(tmpDir, segment.getId.toString))

+    }

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/utils/SchemaUtils.scala b/spark/src/main/scala/org/apache/druid/spark/utils/SchemaUtils.scala
new file mode 100644
index 0000000..bee0d0b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/utils/SchemaUtils.scala
@@ -0,0 +1,307 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.data.input.InputRow

+import org.apache.druid.data.input.impl.{DimensionSchema, DimensionsSpec, DoubleDimensionSchema,

+  FloatDimensionSchema, LongDimensionSchema, StringDimensionSchema}

+import org.apache.druid.java.util.common.IAE

+import org.apache.druid.segment.QueryableIndex

+import org.apache.druid.segment.column.{RowSignature, ValueType}

+import org.apache.druid.spark.registries.ComplexTypeRegistry

+import org.apache.spark.sql.catalyst.InternalRow

+import org.apache.spark.sql.catalyst.util.ArrayData

+import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, DoubleType, FloatType,

+  IntegerType, LongType, StringType, StructField, StructType, TimestampType}

+import org.apache.spark.unsafe.types.UTF8String

+

+import java.util.{Collection => JCollection}

+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter,

+  iterableAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter}

+

+/**

+  * Converters and utilities for working with Spark and Druid schemas.

+  */

+object SchemaUtils {

+  /**

+    * Convert a COLUMNMAP representing a Druid datasource's schema as returned by

+    * DruidMetadataClient.getClient into a Spark StructType.

+    *

+    * @param columnMap The Druid schema to convert into a corresponding Spark StructType.

+    * @return The StructType equivalent of the Druid schema described by COLUMNMAP.

+    */

+  def convertDruidSchemaToSparkSchema(columnMap: Map[String, (String, Boolean)]): StructType = {

+    StructType.apply(

+      columnMap.map { case (name, (colType, hasMultipleValues)) =>

+        val sparkType = colType match {

+          case "LONG" => LongType

+          case "STRING" => StringType

+          case "DOUBLE" => DoubleType

+          case "FLOAT" => FloatType

+          case "TIMESTAMP" => TimestampType

+          case complexType: String if ComplexTypeRegistry.getRegisteredMetricNames.contains(complexType) =>

+            BinaryType

+          // Add other supported types later

+          case _ => throw new IAE(s"Unrecognized type $colType!")

+        }

+        if (hasMultipleValues) {

+          StructField(name, new ArrayType(sparkType, false))

+        } else {

+          StructField(name, sparkType)

+        }

+      }.toSeq

+    )

+  }

+

+  /**

+    * Convert a Druid INPUTROW into a Spark InternalRow with schema SCHEMA.

+    *

+    * @param inputRow The Druid InputRow to convert into a Spark Row for loading into a dataframe.

+    * @param schema The schema to map INPUTROW into.

+    * @param useDefaultNullHandling Whether to use the Druid defaults for null values or actual nulls.

+    * @return A Spark InternalRow with schema SCHEMA and values parsed from INPUTROW.

+    */

+  def convertInputRowToSparkRow(

+                                 inputRow: InputRow,

+                                 schema: StructType,

+                                 useDefaultNullHandling: Boolean

+                               ): InternalRow = {

+    InternalRow.fromSeq(schema.fieldNames.map { colName =>

+      if (colName == "__time") {

+        inputRow.getTimestampFromEpoch

+      } else {

+        val col = inputRow.getRaw(colName)

+        if (col != null) {

+          schema(colName).dataType match {

+            case _: ArrayType =>

+              val baseType = schema(colName).dataType.asInstanceOf[ArrayType].elementType

+              col match {

+                case collection: JCollection[_] =>

+                  ArrayData.toArrayData(collection.asScala.map { elem =>

+                    parseToScala(elem, baseType)

+                  })

+                case _ =>

+                  // Single-element arrays won't be wrapped when read from Druid; need to do it here

+                  ArrayData.toArrayData(List(parseToScala(col, baseType)))

+              }

+            case _ =>

+              // This is slightly inefficient since some objects will already be the correct type

+              parseToScala(col, schema(colName).dataType)

+          }

+        } else {

+          if (useDefaultNullHandling) {

+            schema(colName).dataType match {

+              case StringType | ArrayType(StringType, _) => UTF8String.EMPTY_UTF8

+              case LongType | IntegerType | FloatType | DoubleType | TimestampType => 0

+              case _ => null // scalastyle:ignore null

+            }

+          } else {

+            null // scalastyle:ignore null

+          }

+        }

+      }

+    })

+  }

+

+  /**

+    * Convert an object COL to the appropriate scala type for the given Spark DataType DT.

+    *

+    * @param col The object to convert to a suitable type.

+    * @param dt The Spark DataType COL should be made compatible with.

+    * @return COL parsed into a type compatible with DT.

+    */

+  def parseToScala(col: Any, dt: DataType): Any = {

+    dt match {

+      case StringType => UTF8String.fromString(col.toString)

+      case LongType => col match {

+        case _: java.lang.Long | Long => col

+        case _: String => col.asInstanceOf[String].toLong

+        case _ => throw new IllegalArgumentException(

+          s"Unsure how to parse ${col.getClass.toString} into a Long!"

+        )

+      }

+      case TimestampType => col // Timestamps should always come back from Druid as DateTimes

+      case FloatType => col match {

+        case _: java.lang.Float | Float => col

+        case _: String => col.asInstanceOf[String].toFloat

+        case _ => throw new IllegalArgumentException(

+          s"Unsure how to parse ${col.getClass.toString} into a Float!"

+        )

+      }

+      case DoubleType => col match {

+        case _: java.lang.Double | Double => col

+        case _: String => col.asInstanceOf[String].toDouble

+        case _ => throw new IllegalArgumentException(

+          s"Unsure how to parse ${col.getClass.toString} into a Double!"

+        )

+      }

+      case BinaryType =>

+        if (ComplexTypeRegistry.getRegisteredSerializedClasses.contains(col.getClass)) {

+          ComplexTypeRegistry.deserialize(col)

+        } else {

+          col match {

+            case arr: Array[Byte] =>

+              arr

+            case _ => throw new IllegalArgumentException(

+              s"Unsure how to parse ${col.getClass.toString} into a ByteArray!"

+            )

+          }

+        }

+      case _ => throw new IllegalArgumentException(

+        s"$dt currently unsupported!"

+      )

+    }

+  }

+

+  /**

+    * Given a list of column names DIMENSIONS and a struct type SCHEMA, returns a list of Druid DimensionSchemas

+    * constructed from each column named in DIMENSIONS and the properties of the corresponding field in SCHEMA.

+    *

+    * @param dimensions A list of column names to construct Druid DimensionSchemas for.

+    * @param schema The Spark schema to use to determine the types of the Druid DimensionSchemas created.

+    * @return A list of DimensionSchemas generated from the type information in SCHEMA for each dimension in DIMENSIONS.

+    */

+  def convertStructTypeToDruidDimensionSchema(

+                                               dimensions: Seq[String],

+                                               schema: StructType

+                                             ): Seq[DimensionSchema] = {

+    schema

+      .filter(field => dimensions.contains(field.name))

+      .map(field =>

+        field.dataType match {

+          case LongType | IntegerType => new LongDimensionSchema(field.name)

+          case FloatType => new FloatDimensionSchema(field.name)

+          case DoubleType => new DoubleDimensionSchema(field.name)

+          case StringType | ArrayType(StringType, _) =>

+            new StringDimensionSchema(field.name)

+          case _ => throw new IAE(

+            "Unsure how to create dimension from column [%s] with data type [%s]",

+            field.name,

+            field.dataType

+          )

+        }

+      )

+  }

+

+  /**

+    * Validates that the given list of DimensionSchemas align with the given Spark schema. This validation is done to

+    * fail fast if the user-provided dimensions have different data types than the data in the source data frame to be

+    * written.

+    *

+    * @param dimensions The list of DimensionSchemas to validate against SCHEMA.

+    * @param schema The source-of-truth Spark schema to ensure compatibility with.

+    * @throws IAE If the data types in DIMENSIONS do not align with the data types in SCHEMA.

+    */

+  def validateDimensionSpecAgainstSparkSchema(dimensions: Seq[DimensionSchema], schema: StructType): Boolean = {

+    val incompatibilities = dimensions.flatMap{dim =>

+      if (schema.fieldNames.contains(dim.getName)) {

+        val sparkType = schema(schema.fieldIndex(dim.getName)).dataType

+        sparkType match {

+          case LongType | IntegerType =>

+            if (dim.getTypeName != DimensionSchema.LONG_TYPE_NAME) {

+              Some(s"${dim.getName}: expected type ${DimensionSchema.LONG_TYPE_NAME} but was ${dim.getTypeName}!")

+            } else {

+              None

+            }

+          case FloatType =>

+            if (dim.getTypeName != DimensionSchema.FLOAT_TYPE_NAME) {

+              Some(s"${dim.getName}: expected type ${DimensionSchema.FLOAT_TYPE_NAME} but was ${dim.getTypeName}!")

+            } else {

+              None

+            }

+          case DoubleType =>

+            if (dim.getTypeName != DimensionSchema.DOUBLE_TYPE_NAME) {

+              Some(s"${dim.getName}: expected type ${DimensionSchema.DOUBLE_TYPE_NAME} but was ${dim.getTypeName}!")

+            } else {

+              None

+            }

+          case StringType | ArrayType(StringType, _) =>

+            if (dim.getTypeName != DimensionSchema.STRING_TYPE_NAME) {

+              Some(s"${dim.getName}: expected type ${DimensionSchema.STRING_TYPE_NAME} but was ${dim.getTypeName}!")

+            } else {

+              None

+            }

+        }

+      } else {

+        None

+      }

+    }

+    if (incompatibilities.nonEmpty) {

+      throw new IAE(s"Incompatible dimensions spec provided! Offending columns: ${incompatibilities.mkString("; ")}")

+    }

+    // For now, the return type could just be Unit, but leaving the stubs in place for future improvement

+    true

+  }

+

+  /**

+    * Given a Spark schema, construct an equivalent Druid RowSignature.

+    *

+    * @param schema The Spark schema to generate a RowSignature from.

+    * @return A RowSignature corresponding to SCHEMA.

+    */

+  def generateRowSignatureFromSparkSchema(schema: StructType): RowSignature = {

+    val builder = RowSignature.builder()

+    schema.foreach{field =>

+      builder.add(field.name, getDruidValueTypeForDataType(field.dataType))

+    }

+    builder.build()

+  }

+

+  /**

+    * Return the Druid ValueType corresponding to a provided Spark DataType.

+    *

+    * @param dtype The Spark DataType

+    * @return The ValueType corresponding to DTYPE.

+    */

+  def getDruidValueTypeForDataType(dtype: DataType): ValueType = {

+    dtype match {

+      case DoubleType => ValueType.DOUBLE

+      case FloatType => ValueType.FLOAT

+      case LongType => ValueType.LONG

+      case StringType => ValueType.STRING

+      case ArrayType(DoubleType, _) => ValueType.DOUBLE_ARRAY

+      case ArrayType(LongType, _) => ValueType.LONG_ARRAY

+      case ArrayType(StringType, _) => ValueType.STRING_ARRAY

+      case _ => ValueType.COMPLEX

+    }

+  }

+

+  def getDimensionsSpecFromIndex(queryableIndex: QueryableIndex): DimensionsSpec = {

+    val dimensionHandlers = queryableIndex.getDimensionHandlers.asScala

+

+    new DimensionsSpec(queryableIndex.getAvailableDimensions.asScala.toSeq.map{

+      dim =>

+        val columnHolder = queryableIndex.getColumnHolder(dim)

+        val handler = dimensionHandlers(dim)

+        columnHolder.getCapabilities.getType match {

+          case ValueType.DOUBLE => new DoubleDimensionSchema(dim)

+          case ValueType.FLOAT => new FloatDimensionSchema(dim)

+          case ValueType.LONG => new LongDimensionSchema(dim)

+          case ValueType.STRING => new StringDimensionSchema(

+            dim, handler.getMultivalueHandling, columnHolder.getCapabilities.hasBitmapIndexes

+          )

+          case ValueType.DOUBLE_ARRAY | ValueType.LONG_ARRAY | ValueType.STRING_ARRAY | ValueType.COMPLEX =>

+            throw new IAE(

+              s"This reader cannot process dimension [$dim] with type [${columnHolder.getCapabilities.getType}]!"

+            )

+        }

+    }.toList.asJava)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/DruidDataSourceV2.scala b/spark/src/main/scala/org/apache/druid/spark/v2/DruidDataSourceV2.scala
new file mode 100644
index 0000000..76d70e2
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/DruidDataSourceV2.scala
@@ -0,0 +1,58 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2

+

+import org.apache.druid.spark.v2.reader.DruidDataSourceReader

+import org.apache.spark.internal.Logging

+import org.apache.spark.sql.sources.DataSourceRegister

+import org.apache.spark.sql.sources.v2.reader.DataSourceReader

+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}

+import org.apache.spark.sql.types.StructType

+

+class DruidDataSourceV2 extends DataSourceV2 with ReadSupport with DataSourceRegister

+  with Logging {

+

+  override def shortName(): String = DruidDataSourceV2ShortName

+

+  /**

+    * Create a DataSourceReader to read data in from Druid, configured via DATASOURCEOPTIONS.

+    *

+    * @param dataSourceOptions A wrapper around the properties map specifed via `.option` or `.options` calls on the

+    *                          DataSourceReader.

+    * @return A DataSourceReader capable of reading data from Druid as configured via DATASOURCEOPTIONS.

+    */

+  override def createReader(dataSourceOptions: DataSourceOptions): DataSourceReader = {

+    DruidDataSourceReader(dataSourceOptions)

+  }

+

+  /**

+    * Create a DataSourceReader to read data in from Druid, configured via DATASOURCEOPTIONS. The provided schema will

+    * be used instead of making calls to the broker.

+    *

+    * @param dataSourceOptions A wrapper around the properties map specifed via `.option` or `.options` calls on the

+    *                          DataSourceReader.

+    * @param schema The schema to use when reading data. Specified via the `.schema` method of a DataSourceReader.

+    * @return A DataSourceReader capable of reading data from Druid as configured via DATASOURCEOPTIONS.

+    */

+  override def createReader(schema: StructType,

+                            dataSourceOptions: DataSourceOptions): DataSourceReader = {

+    DruidDataSourceReader(schema, dataSourceOptions)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/package.scala b/spark/src/main/scala/org/apache/druid/spark/v2/package.scala
new file mode 100644
index 0000000..5847f4e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/package.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory
+import org.apache.druid.segment.{IndexIO, IndexMergerV9}
+
+package object v2 { // scalastyle:ignore package.object.name
+  val DruidDataSourceV2ShortName = "druid"
+
+  private[v2] val INDEX_IO = new IndexIO(
+    MAPPER,
+    () => 1000000
+  )
+
+  private[v2] val INDEX_MERGER_V9 = new IndexMergerV9(
+    MAPPER,
+    INDEX_IO,
+    OnHeapMemorySegmentWriteOutMediumFactory.instance()
+  )
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidBaseInputPartitionReader.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidBaseInputPartitionReader.scala
new file mode 100644
index 0000000..23e9571
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidBaseInputPartitionReader.scala
@@ -0,0 +1,78 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import com.fasterxml.jackson.core.`type`.TypeReference

+import org.apache.druid.java.util.common.{FileUtils, ISE, StringUtils}

+import org.apache.druid.segment.{QueryableIndex, Segment, SegmentLazyLoadFailCallback}

+import org.apache.druid.segment.loading.SegmentLoader

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys, SerializableHadoopConfiguration}

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.registries.{ComplexTypeRegistry, SegmentReaderRegistry}

+import org.apache.druid.spark.utils.{NullHandlingUtils, PartitionReaderSegmentLoader}

+import org.apache.druid.spark.v2.INDEX_IO

+import org.apache.druid.timeline.DataSegment

+import org.apache.spark.broadcast.Broadcast

+

+import java.io.File

+import java.util.concurrent.atomic.AtomicBoolean

+

+class DruidBaseInputPartitionReader(

+                                     segmentStr: String,

+                                     columnTypes: Option[Set[String]],

+                                     broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],

+                                     conf: Configuration,

+                                     useSparkConfForDeepStorage: Boolean,

+                                     useCompactSketches: Boolean,

+                                     useDefaultNullHandling: Boolean

+                               ) extends Logging {

+  // Need to initialize Druid's internal null handling as well for filters etc.

+  NullHandlingUtils.initializeDruidNullHandling(useDefaultNullHandling)

+

+  if (columnTypes.isDefined) {

+    // Callers will need to explicitly register any complex types not known to ComplexTypeRegistry by default

+    columnTypes.get.foreach {

+      ComplexTypeRegistry.registerByName(_, useCompactSketches)

+    }

+  } else {

+    ComplexTypeRegistry.initializeDefaults()

+  }

+  ComplexTypeRegistry.registerSerdes()

+

+  // If there are mixed deep storage types, callers will need to handle initialization themselves.

+  if (!useSparkConfForDeepStorage && DruidBaseInputPartitionReader.initialized.compareAndSet(false, true)) {

+    val deepStorageType = conf.get(DruidConfigurationKeys.deepStorageTypeDefaultKey)

+    SegmentReaderRegistry.registerInitializerByType(deepStorageType)

+    SegmentReaderRegistry.initialize(deepStorageType, conf.dive(deepStorageType))

+  }

+

+  private[reader] val dataSegment =

+    MAPPER.readValue[DataSegment](segmentStr, new TypeReference[DataSegment] {})

+  private[reader] lazy val hadoopConf = broadcastedHadoopConf.value.value

+  private[reader] lazy val tmpDir: File = FileUtils.createTempDir

+  private[reader] lazy val segmentLoader: SegmentLoader = new PartitionReaderSegmentLoader(tmpDir, hadoopConf, INDEX_IO)

+  private[reader] lazy val segment: Segment =

+    segmentLoader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP)

+}

+

+private[reader] object DruidBaseInputPartitionReader {

+  private val initialized = new AtomicBoolean(false)

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartition.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartition.scala
new file mode 100644
index 0000000..91dd848
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartition.scala
@@ -0,0 +1,69 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.query.filter.DimFilter

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}

+import org.apache.druid.timeline.DataSegment

+import org.apache.spark.sql.SparkSession

+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}

+import org.apache.spark.sql.types.StructType

+import org.apache.spark.sql.vectorized.ColumnarBatch

+

+/**

+  * Defines a single partition in the dataframe's underlying RDD. This object is generated in the driver and then

+  * serialized to the executors where it is responsible for creating the actual ([[InputPartitionReader]]) which

+  * does the actual reading.

+  */

+class DruidColumnarInputPartition(

+                                   segment: DataSegment,

+                                   schema: StructType,

+                                   filter: Option[DimFilter],

+                                   columnTypes: Option[Set[String]],

+                                   conf: Configuration,

+                                   useSparkConfForDeepStorage: Boolean,

+                                   useCompactSketches: Boolean,

+                                   useDefaultNullHandling: Boolean,

+                                   batchSize: Int

+                            ) extends InputPartition[ColumnarBatch] {

+  // There's probably a better way to do this

+  private val session = SparkSession.getActiveSession.get // We're running on the driver, it exists

+  private val broadcastConf =

+    session.sparkContext.broadcast(

+      new SerializableHadoopConfiguration(session.sparkContext.hadoopConfiguration)

+    )

+  private val serializedSegment: String = MAPPER.writeValueAsString(segment)

+

+  override def createPartitionReader(): InputPartitionReader[ColumnarBatch] = {

+    new DruidColumnarInputPartitionReader(

+      serializedSegment,

+      schema,

+      filter,

+      columnTypes,

+      broadcastConf,

+      conf,

+      useSparkConfForDeepStorage,

+      useCompactSketches,

+      useDefaultNullHandling,

+      batchSize

+    )

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
new file mode 100644
index 0000000..4ac423b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
@@ -0,0 +1,386 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}

+import org.apache.druid.query.dimension.DefaultDimensionSpec

+import org.apache.druid.query.filter.DimFilter

+import org.apache.druid.segment.column.ValueType

+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}

+import org.apache.druid.segment.VirtualColumns

+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.registries.ComplexTypeRegistry

+import org.apache.spark.broadcast.Broadcast

+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}

+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader

+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,

+  StructField, StructType, TimestampType}

+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}

+

+class DruidColumnarInputPartitionReader(

+                                         segmentStr: String,

+                                         schema: StructType,

+                                         filter: Option[DimFilter],

+                                         columnTypes: Option[Set[String]],

+                                         broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],

+                                         conf: Configuration,

+                                         useSparkConfForDeepStorage: Boolean,

+                                         useCompactSketches: Boolean,

+                                         useDefaultNullHandling: Boolean,

+                                         batchSize: Int

+                                       )

+  extends DruidBaseInputPartitionReader(

+    segmentStr,

+    columnTypes,

+    broadcastedHadoopConf,

+    conf,

+    useSparkConfForDeepStorage,

+    useCompactSketches,

+    useDefaultNullHandling

+  ) with InputPartitionReader[ColumnarBatch] with Logging {

+

+  private val cursor: VectorCursor = segment.asStorageAdapter().makeVectorCursor(

+    filter.map(_.toOptimizedFilter).orNull,

+    segment.getDataInterval,

+    VirtualColumns.EMPTY,

+    false,

+    batchSize,

+    null) // scalastyle:ignore null

+

+  private val columnVectors: Array[OnHeapColumnVector] = OnHeapColumnVector.allocateColumns(batchSize, schema)

+  private val resultBatch: ColumnarBatch = new ColumnarBatch(columnVectors.map(_.asInstanceOf[ColumnVector]))

+

+  override def next(): Boolean = {

+    if (!cursor.isDone) {

+      fillVectors()

+      true

+    } else {

+      false

+    }

+  }

+

+  override def get(): ColumnarBatch = {

+    resultBatch

+  }

+

+  override def close(): Unit = {

+    try {

+      resultBatch.close()

+      columnVectors.foreach(_.close())

+      if (Option(cursor).nonEmpty) {

+        cursor.close()

+      }

+      if (Option(segment).nonEmpty) {

+        segment.close()

+      }

+      if (Option(tmpDir).nonEmpty) {

+        FileUtils.deleteDirectory(tmpDir)

+      }

+    } catch {

+      case e: Exception =>

+        // Since we're just going to rethrow e and tearing down the JVM will clean up the result batch, column vectors,

+        // cursor, and segment even if we can't, the only leak we have to worry about is the temp file. Spark should

+        // clean up temp files as well, but rather than rely on that we'll try to take care of it ourselves.

+        logWarn("Encountered exception attempting to close a DruidColumnarInputPartitionReader!")

+        if (Option(tmpDir).nonEmpty && tmpDir.exists()) {

+          FileUtils.deleteDirectory(tmpDir)

+        }

+        throw e

+    }

+  }

+

+  // TODO: Maybe ColumnProcessors can help here? Need to investigate

+  private[reader] def fillVectors(): Unit = {

+    columnVectors.foreach(_.reset())

+    val selectorFactory = cursor.getColumnSelectorFactory

+

+    schema.fields.zipWithIndex.foreach{case(col, i) =>

+      val capabilities = selectorFactory.getColumnCapabilities(col.name)

+      val columnVector = columnVectors(i)

+      if (capabilities == null) { // scalastyle:ignore null

+        fillNullVector(columnVector, col)

+      } else {

+        capabilities.getType match {

+          case ValueType.FLOAT | ValueType.LONG | ValueType.DOUBLE =>

+            fillNumericVector(capabilities.getType, selectorFactory, columnVector, col.name)

+          case ValueType.STRING =>

+            fillStringVector(selectorFactory, columnVector, col, capabilities.hasMultipleValues.isMaybeTrue)

+          case ValueType.COMPLEX =>

+            fillComplexVector(selectorFactory, columnVector, col)

+          case _ => throw new IAE(s"Unrecognized ValueType ${capabilities.getType}!")

+        }

+      }

+    }

+    resultBatch.setNumRows(cursor.getCurrentVectorSize)

+    cursor.advance()

+  }

+

+  /**

+    * Fill a Spark ColumnVector with the values from a Druid VectorSelector containing numeric rows.

+    * The general pattern is:

+    *   1) If there are no null values in the Druid data, just copy the backing array over

+    *   2) If there are nulls (the null vector is not itself null), for each index in the Druid vector check

+    *        if the source vector is null at that index and if so insert the appropriate null value into the

+    *        Spark vector. Otherwise, copy over the value at that index from the Druid vector.

+    *

+    * @param valueType The ValueType of the Druid column to fill COLUMNVECTOR from.

+    * @param selectorFactory The Druid SelectorFactory backed by the data read in from segment files.

+    * @param columnVector The Spark ColumnVector to fill with the data from SELECTORFACTORY.

+    * @param name The name of the column in Druid to source data from.

+    */

+  private[reader] def fillNumericVector(

+                                       valueType: ValueType,

+                                       selectorFactory: VectorColumnSelectorFactory,

+                                       columnVector: WritableColumnVector,

+                                       name: String

+                                       ): Unit = {

+    val selector = selectorFactory.makeValueSelector(name)

+    val vectorLength = selector.getCurrentVectorSize

+    val nulls = selector.getNullVector

+

+    valueType match {

+      case ValueType.FLOAT =>

+        val vector = selector.getFloatVector

+        if (nulls == null) { // scalastyle:ignore null

+          columnVector.putFloats(0, vectorLength, vector, 0)

+        } else {

+          (0 until vectorLength).foreach { i =>

+            if (nulls(i)) {

+              if (useDefaultNullHandling) {

+                columnVector.putFloat(i, 0)

+              } else {

+                columnVector.putNull(i)

+              }

+            } else {

+              columnVector.putFloat(i, vector(i))

+            }

+          }

+        }

+      case ValueType.LONG =>

+        val vector = selector.getLongVector

+        if (nulls == null) { // scalastyle:ignore null

+          columnVector.putLongs(0, vectorLength, vector, 0)

+        } else {

+          (0 until vectorLength).foreach { i =>

+            if (nulls(i)) {

+              if (useDefaultNullHandling) {

+                columnVector.putLong(i, 0)

+              } else {

+                columnVector.putNull(i)

+              }

+            } else {

+              columnVector.putLong(i, vector(i))

+            }

+          }

+        }

+      case ValueType.DOUBLE =>

+        val vector = selector.getDoubleVector

+        if (nulls == null) { // scalastyle:ignore null

+          columnVector.putDoubles(0, vectorLength, vector, 0)

+        } else {

+          (0 until vectorLength).foreach { i =>

+            if (nulls(i)) {

+              if (useDefaultNullHandling) {

+                columnVector.putDouble(i, 0)

+              } else {

+                columnVector.putNull(i)

+              }

+            } else {

+              columnVector.putDouble(i, vector(i))

+            }

+          }

+        }

+      case _ => throw new IAE(s"Must call fillNumericVector will a numeric value type; called with $valueType!")

+    }

+  }

+

+  /**

+    * Fill a Spark ColumnVector with the values from a Druid VectorSelector containing string rows.

+    *

+    * In theory, we could define a ColumnVector implementation that handled single- and multi-valued strings

+    * intelligently while falling back to the existing behavior for other data types. Unfortunately, Spark marks

+    * OnHeapColumnVector as final so we'd need to copy the underlying logic and maintain it ourselves or abuse

+    * reflection. Additionally, Spark doesn't really do anything clever with columnar dataframes in 2.4. Specifically

+    * for multi-valued string columns this means that under the hood Spark will immediately convert each sub-array

+    * (e.g. row) into an Object[] and so we won't gain anything by maintaining the value dictionary. Instead, we define

+    * a SingleValueDimensionDictionary to handle the single-valued case and reify multi-valued dimensions ourselves to

+    * reduce complexity.

+    *

+    * There are also a couple of open questions to investigate:

+    *

+    * First, how does Spark expect nulls to be flagged from dictionaries? If dictionaries can happily return null, then

+    * we can just drop the row vector in the dictionary creation and be on our way. If Spark expects nulls to be flagged

+    * explicitly, then we'll need to figure out how the different Druid null handling strategies change both what gets

+    * stored on disk and what we read here from the SingleValueDimensionSelector. In this case, based on

+    * PossiblyNullDimensionSelector we'll likely need to iterate over the row vector returned by the selector and call

+    * either putNull if the value at the index is 0 or putInt otherwise.

+    *

+    * Second, can Druid dictionaries change between parts of the segment file (i.e in different smooshes)? If they can,

+    * we need to add checks for that case and fall back to putting byte arrays into the column vector directly for

+    * single-valued dimensions.

+    *

+    * @param selectorFactory The Druid SelectorFactory backed by the data read in from segment files.

+    * @param columnVector The Spark ColumnVector to fill with the data from SELECTORFACTORY.

+    * @param column The Spark column schema we're filling.

+    * @param maybeHasMultipleValues Whether or not the Druid column we're reading from may contain multiple values.

+    */

+  private[reader] def fillStringVector(

+                                        selectorFactory: VectorColumnSelectorFactory,

+                                        columnVector: WritableColumnVector,

+                                        column: StructField,

+                                        maybeHasMultipleValues: Boolean

+                                      ): Unit = {

+    if (maybeHasMultipleValues) {

+      // Multi-valued string dimension that may contain multiple values in this batch

+      val selector = selectorFactory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(column.name))

+      val vector = selector.getRowVector

+      val vectorLength =  selector.getCurrentVectorSize

+

+      // This will store repeated strings multiple times. CPU should be more important than storage here, but

+      // if the repeated strings are a problem and reducing the batch size doesn't help, we could implement our

+      // own ColumnVector that tracks the row for each string in the lookup dict and then stores arrays of rowIds.

+      // We'd need two vectors (the main ColumnVector, which would store an array of ints for each actual row id

+      // and an arrayData column vector, which would store strings at each internal row id.) When we read in an

+      // array of IndexedInts, we'd check to see if we'd already stored the corresponding string in arrayData and

+      // if so just use the existing internal row. The ints in the main vector would point to the internal row ids

+      // and we'd override ColumnVector#getArray(rowId: Int) to follow the logic on read. This would preserve the

+      // space savings of the dictionary-encoding at the cost of possibly more CPU at read.

+

+      val arrayData = columnVector.arrayData()

+      // Note that offsets here are in rows, not bytes

+      var columnVectorOffset = 0

+      var arrayDataOffset = 0

+

+      // Iterating over the populated elements of vector twice is faster than reserving additional capacity as

+      // each new row is processed since reserving more capacity means copying arrays.

+      val numberOfValuesInBatch = (0 until vectorLength).map(vector(_).size()).sum

+      arrayData.reserve(numberOfValuesInBatch)

+

+      (0 until vectorLength).foreach{i =>

+        val arr = vector(i)

+        if (arr == null) {

+          // TODO: Is this possible? Need to test

+          columnVector.putNull(i)

+        } else {

+          val numberOfValuesInRow = arr.size() // Number of values in this row

+          (0 until numberOfValuesInRow).foreach { idx =>

+            val id = arr.get(idx)

+            val bytes = StringUtils.toUtf8(selector.lookupName(id))

+

+            arrayData.putByteArray(arrayDataOffset, bytes)

+            arrayDataOffset += 1

+          }

+          columnVector.putArray(i, columnVectorOffset, numberOfValuesInRow)

+          columnVectorOffset += numberOfValuesInRow

+        }

+      }

+    } else {

+      // Multi-valued string dimension that does not contain multiple values in this batch

+      val selector = selectorFactory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(column.name))

+      val vector = selector.getRowVector

+      val vectorLength = selector.getCurrentVectorSize

+

+      if (column.dataType.isInstanceOf[ArrayType]) {

+        // need to handle as if it were multi-dimensional so results are properly wrapped in arrays in spark

+        val arrayData = columnVector.arrayData()

+

+        // TODO: Work out null handling (see SingleValueDimensionDictionary as well)

+        (0 until vectorLength).foreach{i =>

+          val bytes = StringUtils.toUtf8(selector.lookupName(vector(i)))

+

+          arrayData.putByteArray(i, bytes)

+          columnVector.putArray(i, i,1)

+        }

+      } else {

+        // Single-valued string dimension

+        // TODO: There's got to be a better way to extract the lookups, but for now YOLO

+        val cardinality = selector.getValueCardinality

+        if (cardinality == -1) {

+          throw new ISE("Encountered dictionary with unknown cardinality, vectorized reading not supported!")

+        }

+        val lookupMap = (0 until cardinality).map { id =>

+          id -> selector.lookupName(id)

+        }.toMap

+        val colDict = new SingleValueDimensionDictionary(lookupMap)

+

+        val dictionaryIds = columnVector.reserveDictionaryIds(vectorLength)

+        dictionaryIds.appendInts(vectorLength, vector, 0)

+        columnVector.setDictionary(colDict)

+      }

+    }

+  }

+

+  private[reader] def fillComplexVector(

+                                         selectorFactory: VectorColumnSelectorFactory,

+                                         columnVector: WritableColumnVector,

+                                         column: StructField

+                                       ): Unit = {

+    val selector = selectorFactory.makeObjectSelector(column.name)

+    val vector = selector.getObjectVector

+    val vectorLength = selector.getCurrentVectorSize

+

+    (0 until vectorLength).foreach{i =>

+      val obj = vector(i)

+      if (obj == null) { // scalastyle:ignore null

+        columnVector.putNull(i)

+      } else if (ComplexTypeRegistry.getRegisteredSerializedClasses.contains(obj.getClass)) {

+        val bytes = ComplexTypeRegistry.deserialize(obj)

+        columnVector.putByteArray(i, bytes)

+      } else {

+        obj match {

+          case arr: Array[Byte] =>

+            columnVector.putByteArray(i, arr)

+          case _ => throw new IllegalArgumentException(

+            s"Unable to parse ${column.getClass.toString} into a ByteArray! Try registering a Complex Type Plugin."

+          )

+        }

+      }

+    }

+  }

+

+  private[reader] def fillNullVector(columnVector: WritableColumnVector, column: StructField): Unit = {

+    val vectorLength = cursor.getCurrentVectorSize

+    if (useDefaultNullHandling) {

+      column.dataType match {

+        case FloatType =>

+          columnVector.putFloats(0, vectorLength, 0)

+        case LongType | TimestampType =>

+          columnVector.putLongs(0, vectorLength, 0)

+        case DoubleType =>

+          columnVector.putDoubles(0, vectorLength, 0)

+        case StringType =>

+          (0 until vectorLength).foreach{i =>

+            columnVector.putByteArray(i, Array.emptyByteArray)

+          }

+        case ArrayType(StringType, _) =>

+          val arrayData = columnVector.arrayData()

+          (0 until vectorLength).foreach{i =>

+            arrayData.putByteArray(i, Array.emptyByteArray)

+            columnVector.putArray(i, i,1)

+          }

+        case _ => // Complex Types use nulls regardless of null handling mode. Also nulling unknown types.

+          columnVector.putNulls(0, vectorLength)

+      }

+    } else {

+      columnVector.putNulls(0, cursor.getCurrentVectorSize)

+    }

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
new file mode 100644
index 0000000..badfba5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
@@ -0,0 +1,289 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import com.fasterxml.jackson.core.`type`.TypeReference

+import org.apache.druid.java.util.common.{Intervals, JodaUtils}

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}

+import org.apache.druid.timeline.DataSegment

+import org.apache.spark.sql.catalyst.InternalRow

+import org.apache.spark.sql.sources.v2.DataSourceOptions

+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,

+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}

+import org.apache.spark.sql.sources.Filter

+import org.apache.spark.sql.types.StructType

+import org.apache.spark.sql.vectorized.ColumnarBatch

+import org.joda.time.Interval

+

+import java.util.{List => JList}

+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}

+

+/**

+  * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine

+  * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining

+  * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use

+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead

+  * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)

+  *

+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.

+  */

+class DruidDataSourceReader(

+                             var schema: Option[StructType] = None,

+                             conf: Configuration

+                           ) extends DataSourceReader

+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {

+  private lazy val metadataClient =

+    DruidDataSourceReader.createDruidMetaDataClient(conf)

+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)

+

+  private var filters: Array[Filter] = Array.empty

+  private var druidColumnTypes: Option[Set[String]] = Option.empty

+

+  override def readSchema(): StructType = {

+    if (schema.isDefined) {

+      schema.get

+    } else {

+      require(conf.isPresent(DruidConfigurationKeys.tableKey),

+        s"Must set ${DruidConfigurationKeys.tableKey}!")

+      // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than

+      //  twice the granularity duration, we can send a list with two disjoint intervals and

+      //  minimize the load on the broker from having to merge large numbers of segments

+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)

+      val columnMap = druidClient.getSchema(

+        conf.getString(DruidConfigurationKeys.tableKey),

+        Some(List[Interval](Intervals.utc(

+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),

+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)

+        )))

+      )

+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))

+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)

+      schema.get

+    }

+  }

+

+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {

+    // For now, one partition for each Druid segment partition

+    // Future improvements can use information from SegmentAnalyzer results to do smart things

+    if (schema.isEmpty) {

+      readSchema()

+    }

+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)

+    val filter = FilterUtils.mapFilters(filters, schema.get).map(_.optimize())

+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)

+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)

+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)

+

+    // Allow passing hard-coded list of segments to load

+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {

+      val segments: JList[DataSegment] = MAPPER.readValue(

+        readerConf.getString(DruidConfigurationKeys.segmentsKey),

+        new TypeReference[JList[DataSegment]]() {}

+      )

+      segments.asScala

+        .map(segment =>

+          new DruidInputPartition(

+            segment,

+            schema.get,

+            filter,

+            druidColumnTypes,

+            conf,

+            useSparkConfForDeepStorage,

+            useCompactSketches,

+            useDefaultNullHandling

+          ): InputPartition[InternalRow]

+        ).asJava

+    } else {

+      getSegments

+        .map(segment =>

+          new DruidInputPartition(

+            segment,

+            schema.get,

+            filter,

+            druidColumnTypes,

+            conf,

+            useSparkConfForDeepStorage,

+            useCompactSketches,

+            useDefaultNullHandling

+          ): InputPartition[InternalRow]

+        ).asJava

+    }

+  }

+

+  override def pruneColumns(structType: StructType): Unit = {

+    schema = Option(structType)

+  }

+

+  /**

+    * Given an array of Spark filters FILTERS, adds the filters this reader can push down to this.filters and returns

+    * an array containing the filters in FILTERS this reader cannot support.

+    *

+    * @param filters An array of filters to evaluate for predicate pushdown support.

+    * @return The filters in FILTERS this reader does not support.

+    */

+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {

+    readSchema()

+    val useSqlCompatibleNullHandling = !conf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)

+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get, useSqlCompatibleNullHandling)) match {

+      case (supported, unsupported) =>

+        this.filters = supported

+        unsupported

+    }

+  }

+

+  override def pushedFilters(): Array[Filter] = filters

+

+  private[reader] def getSegments: Seq[DataSegment] = {

+    require(conf.isPresent(DruidConfigurationKeys.tableKey),

+      s"Must set ${DruidConfigurationKeys.tableKey}!")

+

+    // Check filters for any bounds on __time

+    // Otherwise, we'd need to full scan the segments table

+    val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)

+

+    metadataClient.getSegmentPayloads(

+      conf.getString(DruidConfigurationKeys.tableKey),

+      lowerTimeBound,

+      upperTimeBound,

+      conf.getBoolean(DruidConfigurationKeys.allowIncompletePartitionsDefaultKey)

+    )

+  }

+

+  override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {

+    if (schema.isEmpty) {

+      readSchema()

+    }

+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)

+    val filter = FilterUtils.mapFilters(filters, schema.get).map(_.optimize())

+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)

+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)

+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)

+    val batchSize = readerConf.getInt(DruidConfigurationKeys.batchSizeDefaultKey)

+

+    // Allow passing hard-coded list of segments to load

+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {

+      val segments: JList[DataSegment] = MAPPER.readValue(

+        readerConf.getString(DruidConfigurationKeys.segmentsKey),

+        new TypeReference[JList[DataSegment]]() {}

+      )

+      segments.asScala

+        .map(segment =>

+          new DruidColumnarInputPartition(

+            segment,

+            schema.get,

+            filter,

+            druidColumnTypes,

+            conf,

+            useSparkConfForDeepStorage,

+            useCompactSketches,

+            useDefaultNullHandling,

+            batchSize

+          ): InputPartition[ColumnarBatch]

+        ).asJava

+    } else {

+      getSegments

+        .map(segment =>

+          new DruidColumnarInputPartition(

+            segment,

+            schema.get,

+            filter,

+            druidColumnTypes,

+            conf,

+            useSparkConfForDeepStorage,

+            useCompactSketches,

+            useDefaultNullHandling,

+            batchSize

+          ): InputPartition[ColumnarBatch]

+        ).asJava

+    }

+  }

+

+  override def enableBatchRead(): Boolean = {

+    // Fail fast

+    if (!conf.dive(DruidConfigurationKeys.readerPrefix).getBoolean(DruidConfigurationKeys.vectorizeDefaultKey)) {

+      false

+    } else {

+      if (schema.isEmpty) {

+        readSchema()

+      }

+      val filterOpt = FilterUtils.mapFilters(filters, schema.get)

+      filterOpt.fold(true) { filter =>

+        val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get)

+        val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature)

+        if (!canVectorize) {

+          logWarn("Vectorization enabled in config but pushed-down filters are not vectorizable! Reading rows.")

+        }

+        canVectorize

+      }

+    }

+  }

+}

+

+object DruidDataSourceReader {

+  def apply(schema: StructType, dataSourceOptions: DataSourceOptions): DruidDataSourceReader = {

+    new DruidDataSourceReader(Option(schema), Configuration(dataSourceOptions))

+  }

+

+  def apply(dataSourceOptions: DataSourceOptions): DruidDataSourceReader = {

+    new DruidDataSourceReader(None, Configuration(dataSourceOptions))

+  }

+

+  /* Unfortunately, there's no single method of interacting with a Druid cluster that provides all

+   * three operations we need: get segment locations, get dataSource schemata, and publish segments.

+   *

+   * Segment locations can be determined either via direct interaction with the metadata server or

+   * via the coordinator API, but not via querying the `sys.segments` table served from a cluster

+   * since the `sys.segments` table prunes load specs.

+   *

+   * Data source schemata can be determined via querying the `INFORMATION_SCHEMA.COLUMNS` table, via

+   * SegmentMetadataQueries, or via pulling segments into memory and analyzing them.

+   * SegmentMetadataQueries can be expensive and time-consuming for large numbers of segments. This

+   * could be worked around by only checking the first and last segments for an interval, which

+   * would catch schema evolution that spans the interval to query, but not schema evolution within

+   * the interval and would prevent determining accurate statistics. Likewise, pulling segments into

+   * memory on the driver to check their schema is expensive and inefficient and has the same schema

+   * evolution and accurate statistics problem.

+   * The `INFORMATION_SCHEMA.COLUMNS` table does not contain information about whether or not a column

+   * could contain multiple values and does not know the actual metric type for complex types. Less

+   * relevantly, we wouldn't have access to possibly useful statistics about the segments

+   * that could be used to perform more efficient reading, and the Druid cluster to read from would

+   * need to have sql querying initialized and be running a version of Druid >= 0.14. Since we're

+   * not currently doing any intelligent partitioning for reads, this doesn't really matter.

+   *

+   * Publishing segments can only be done via direct interaction with the metadata server.

+   *

+   * Since there's no way to satisfy these constraints with a single method of interaction, we will

+   * need to use a metadata client and a druid client. The metadata client can fetch segment

+   * locations and publish segments, and the druid client will issue SegmentMetadata queries to determine

+   * datasource schemata.

+   */

+

+  def createDruidMetaDataClient(conf: Configuration): DruidMetadataClient = {

+    DruidMetadataClient(conf)

+  }

+

+  def createDruidClient(conf: Configuration): DruidClient = {

+    DruidClient(conf)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartition.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartition.scala
new file mode 100644
index 0000000..9f4ab75
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartition.scala
@@ -0,0 +1,67 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.query.filter.DimFilter

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}

+import org.apache.druid.timeline.DataSegment

+import org.apache.spark.sql.SparkSession

+import org.apache.spark.sql.catalyst.InternalRow

+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}

+import org.apache.spark.sql.types.StructType

+

+/**

+  * Defines a single partition in the dataframe's underlying RDD. This object is generated in the driver and then

+  * serialized to the executors where it is responsible for creating the actual ([[InputPartitionReader]]) which

+  * does the actual reading.

+  */

+class DruidInputPartition(

+                           segment: DataSegment,

+                           schema: StructType,

+                           filter: Option[DimFilter],

+                           columnTypes: Option[Set[String]],

+                           conf: Configuration,

+                           useSparkConfForDeepStorage: Boolean,

+                           useCompactSketches: Boolean,

+                           useDefaultNullHandling: Boolean

+                         ) extends InputPartition[InternalRow] {

+  // There's probably a better way to do this

+  private val session = SparkSession.getActiveSession.get // We're running on the driver, it exists

+  private val broadcastConf =

+    session.sparkContext.broadcast(

+      new SerializableHadoopConfiguration(session.sparkContext.hadoopConfiguration)

+    )

+  private val serializedSegment: String = MAPPER.writeValueAsString(segment)

+

+  override def createPartitionReader(): InputPartitionReader[InternalRow] = {

+    new DruidInputPartitionReader(

+      serializedSegment,

+      schema,

+      filter,

+      columnTypes,

+      broadcastConf,

+      conf,

+      useSparkConfForDeepStorage,

+      useCompactSketches,

+      useDefaultNullHandling

+    )

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReader.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReader.scala
new file mode 100644
index 0000000..89e926e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReader.scala
@@ -0,0 +1,138 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.data.input.impl.{DimensionsSpec, TimestampSpec}

+import org.apache.druid.data.input.{ColumnsFilter, InputEntityReader, InputRowSchema}

+import org.apache.druid.indexing.input.{DruidSegmentInputEntity, DruidSegmentInputFormat}

+import org.apache.druid.java.util.common.FileUtils

+import org.apache.druid.query.filter.DimFilter

+import org.apache.druid.segment.loading.SegmentLoader

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys, SerializableHadoopConfiguration}

+import org.apache.druid.spark.mixins.Logging

+import org.apache.druid.spark.utils.SchemaUtils

+import org.apache.druid.spark.v2.INDEX_IO

+import org.apache.druid.timeline.DataSegment

+import org.apache.spark.broadcast.Broadcast

+import org.apache.spark.sql.catalyst.InternalRow

+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader

+import org.apache.spark.sql.types.StructType

+

+import java.io.File

+import scala.collection.JavaConverters.setAsJavaSetConverter

+

+class DruidInputPartitionReader(

+                                 segmentStr: String,

+                                 schema: StructType,

+                                 filter: Option[DimFilter],

+                                 columnTypes: Option[Set[String]],

+                                 broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],

+                                 conf: Configuration,

+                                 useSparkConfForDeepStorage: Boolean,

+                                 useCompactSketches: Boolean,

+                                 useDefaultNullHandling: Boolean

+                               )

+  extends DruidBaseInputPartitionReader(

+    segmentStr,

+    columnTypes,

+    broadcastedHadoopConf,

+    conf,

+    useSparkConfForDeepStorage,

+    useCompactSketches,

+    useDefaultNullHandling

+  ) with InputPartitionReader[InternalRow] with Logging {

+

+  private val availableColumns =

+    segment.asQueryableIndex().getColumnNames + conf.get(DruidConfigurationKeys.timestampColumnDefaultReaderKey)

+

+  private val inputEntityReaderRows = DruidInputPartitionReader.makeInputFormat(

+    dataSegment,

+    segmentLoader,

+    tmpDir,

+    filter.orNull,

+    conf.get(DruidConfigurationKeys.timestampColumnDefaultReaderKey),

+    conf.get(DruidConfigurationKeys.timestampFormatDefaultReaderKey),

+    SchemaUtils.getDimensionsSpecFromIndex(segment.asQueryableIndex()),

+    schema.fieldNames.toList.filter(availableColumns.contains(_))

+  ).read()

+

+  override def next(): Boolean = {

+    inputEntityReaderRows.hasNext

+  }

+

+  override def get(): InternalRow = {

+    SchemaUtils.convertInputRowToSparkRow(inputEntityReaderRows.next(), schema, useDefaultNullHandling)

+  }

+

+  override def close(): Unit = {

+    try {

+      if (Option(segment).nonEmpty) {

+        segment.close()

+      }

+      if (Option(tmpDir).nonEmpty) {

+        FileUtils.deleteDirectory(tmpDir)

+      }

+    } catch {

+      case e: Exception =>

+        // Since we're just going to rethrow e and tearing down the JVM will clean up the segment even if we can't, the

+        // only leak we have to worry about is the temp file. Spark should clean up temp files as well, but rather than

+        // rely on that we'll try to take care of it ourselves.

+        logWarn("Encountered exception attempting to close a DruidInputPartitionReader!")

+        if (Option(tmpDir).nonEmpty && tmpDir.exists()) {

+          FileUtils.deleteDirectory(tmpDir)

+        }

+        throw e

+    }

+  }

+}

+

+private[v2] object DruidInputPartitionReader {

+  private def makeInputFormat(

+                               segment: DataSegment,

+                               segmentLoader: SegmentLoader,

+                               loadDir: File,

+                               filter: DimFilter,

+                               timestampColumnName: String,

+                               timestampColumnFormat: String,

+                               dimensionsSpec: DimensionsSpec,

+                               columns: Seq[String]

+                             ): InputEntityReader = {

+    val inputFormat = new DruidSegmentInputFormat(INDEX_IO, filter)

+    val timestampSpec = new TimestampSpec(timestampColumnName, timestampColumnFormat, null) // scalastyle:ignore null

+

+    val inputSchema = new InputRowSchema(

+      timestampSpec,

+      dimensionsSpec,

+      ColumnsFilter.inclusionBased(columns.toSet.asJava)

+    )

+

+    val inputSource = new DruidSegmentInputEntity(

+      segmentLoader,

+      segment,

+      segment.getInterval

+    )

+

+    inputFormat.createReader(

+      inputSchema,

+      inputSource,

+      loadDir

+    )

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/v2/reader/SingleValueDimensionDictionary.scala b/spark/src/main/scala/org/apache/druid/spark/v2/reader/SingleValueDimensionDictionary.scala
new file mode 100644
index 0000000..f5b5336
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/v2/reader/SingleValueDimensionDictionary.scala
@@ -0,0 +1,52 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.java.util.common.StringUtils

+import org.apache.spark.sql.execution.vectorized.Dictionary

+

+/**

+  * A Spark "Dictionary" to back single-valued dimension vectors (e.g. columns of strings).

+  *

+  * @param lookups A map of id to String to back this Dictionary with.

+  */

+class SingleValueDimensionDictionary(val lookups: Map[Int, String]) extends Dictionary {

+  override def decodeToInt(id: Int): Int = {

+    throw new UnsupportedOperationException("SingleValueDimensionDictionary encoding does not support ints!")

+  }

+

+  override def decodeToLong(id: Int): Long = {

+    throw new UnsupportedOperationException("SingleValueDimensionDictionary encoding does not support longs!")

+  }

+

+  override def decodeToFloat(id: Int): Float = {

+    throw new UnsupportedOperationException("SingleValueDimensionDictionary encoding does not support floats!")

+  }

+

+  override def decodeToDouble(id: Int): Double = {

+    throw new UnsupportedOperationException("SingleValueDimensionDictionary encoding does not support doubles!")

+  }

+

+  override def decodeToBinary(id: Int): Array[Byte] = {

+    // TODO: Figure out if we want toUtf8Nullable or toUtf8WithNullToEmpty. (This is likely related to adding support

+    //  for Druid's two null-handling modes)

+    StringUtils.toUtf8Nullable(lookups(id))

+  }

+}

diff --git a/spark/src/test/resources/log4j2.xml b/spark/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..c878543
--- /dev/null
+++ b/spark/src/test/resources/log4j2.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8" ?>

+<!--

+  ~ Licensed to the Apache Software Foundation (ASF) under one

+  ~ or more contributor license agreements.  See the NOTICE file

+  ~ distributed with this work for additional information

+  ~ regarding copyright ownership.  The ASF licenses this file

+  ~ to you under the Apache License, Version 2.0 (the

+  ~ "License"); you may not use this file except in compliance

+  ~ with the License.  You may obtain a copy of the License at

+  ~

+  ~   http://www.apache.org/licenses/LICENSE-2.0

+  ~

+  ~ Unless required by applicable law or agreed to in writing,

+  ~ software distributed under the License is distributed on an

+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+  ~ KIND, either express or implied.  See the License for the

+  ~ specific language governing permissions and limitations

+  ~ under the License.

+  -->

+

+<Configuration status="WARN">

+    <Appenders>

+        <Console name="Console" target="SYSTEM_OUT">

+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>

+        </Console>

+    </Appenders>

+    <Loggers>

+        <Root level="info">

+            <AppenderRef ref="Console"/>

+        </Root>

+        <Logger level="warn" name="org.apache.spark" additivity="false">

+            <AppenderRef ref="Console"/>

+        </Logger>

+        <!-- Suppress allocation warning during local tests -->

+        <Logger level="error" name="org.apache.druid.java.util.common.io.NativeIO" additivity="false">

+            <AppenderRef ref="Console"/>

+        </Logger>

+        <!-- Suppressing Reference Counter warnings during local tests -->

+        <Logger level="error" name="org.apache.druid.collections.StupidPool" additivity="false">

+            <AppenderRef ref="Console"/>

+        </Logger>

+        <!-- Suppressing default registration methods during local tests -->

+        <Logger level="warn" name="org.apache.druid.spark.registries.ComplexTypeRegistry" additivity="false">

+            <AppenderRef ref="Console"/>

+        </Logger>

+    </Loggers>

+</Configuration>

diff --git a/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z/0/0/index.zip b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z/0/0/index.zip
new file mode 100644
index 0000000..86682de
--- /dev/null
+++ b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z/0/0/index.zip
Binary files differ
diff --git a/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z/0/1/index.zip b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z/0/1/index.zip
new file mode 100644
index 0000000..9ae6c6e
--- /dev/null
+++ b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z/0/1/index.zip
Binary files differ
diff --git "a/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00\357\200\27200\357\200\27200.000Z_2020-01-02T00\357\200\27200\357\200\27200.000Z/0/0/index.zip" "b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00\357\200\27200\357\200\27200.000Z_2020-01-02T00\357\200\27200\357\200\27200.000Z/0/0/index.zip"
new file mode 100644
index 0000000..86682de
--- /dev/null
+++ "b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00\357\200\27200\357\200\27200.000Z_2020-01-02T00\357\200\27200\357\200\27200.000Z/0/0/index.zip"
Binary files differ
diff --git "a/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00\357\200\27200\357\200\27200.000Z_2020-01-02T00\357\200\27200\357\200\27200.000Z/0/1/index.zip" "b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00\357\200\27200\357\200\27200.000Z_2020-01-02T00\357\200\27200\357\200\27200.000Z/0/1/index.zip"
new file mode 100644
index 0000000..9ae6c6e
--- /dev/null
+++ "b/spark/src/test/resources/segments/spark_druid_test/2020-01-01T00\357\200\27200\357\200\27200.000Z_2020-01-02T00\357\200\27200\357\200\27200.000Z/0/1/index.zip"
Binary files differ
diff --git a/spark/src/test/resources/segments/spark_druid_test/2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z/0/0/index.zip b/spark/src/test/resources/segments/spark_druid_test/2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z/0/0/index.zip
new file mode 100644
index 0000000..41f23f0
--- /dev/null
+++ b/spark/src/test/resources/segments/spark_druid_test/2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z/0/0/index.zip
Binary files differ
diff --git "a/spark/src/test/resources/segments/spark_druid_test/2020-01-02T00\357\200\27200\357\200\27200.000Z_2020-01-03T00\357\200\27200\357\200\27200.000Z/0/0/index.zip" "b/spark/src/test/resources/segments/spark_druid_test/2020-01-02T00\357\200\27200\357\200\27200.000Z_2020-01-03T00\357\200\27200\357\200\27200.000Z/0/0/index.zip"
new file mode 100644
index 0000000..41f23f0
--- /dev/null
+++ "b/spark/src/test/resources/segments/spark_druid_test/2020-01-02T00\357\200\27200\357\200\27200.000Z_2020-01-03T00\357\200\27200\357\200\27200.000Z/0/0/index.zip"
Binary files differ
diff --git a/spark/src/test/scala/org/apache/druid/spark/SparkFunSuite.scala b/spark/src/test/scala/org/apache/druid/spark/SparkFunSuite.scala
new file mode 100644
index 0000000..e7f8865
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/SparkFunSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark
+
+import org.apache.druid.java.util.common.FileUtils
+import org.apache.druid.query.aggregation.datasketches.theta.SketchModule
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.File
+import java.util.UUID
+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
+
+class SparkFunSuite extends AnyFunSuite with BeforeAndAfterEach {
+  private val localSparkContext = new ThreadLocal[SparkContext]
+  private val localSparkSession = new ThreadLocal[SparkSession]
+
+  def sparkContext: SparkContext = localSparkContext.get()
+  def sparkSession: SparkSession = localSparkSession.get()
+
+  private def setupSparkContextAndSession(): Unit = {
+    val config = Map(
+      "spark.master" -> "local[*]",
+      "spark.driver.allowMultipleContexts" -> "true",
+      "spark.ui.enabled" -> "false",
+      "spark.local.dir" -> FileUtils.createTempDir("spark-tests").getAbsolutePath,
+      "spark.default.parallelism" -> "1",
+      "spark.sql.shuffle.partitions" -> "1"
+    )
+
+    val sparkConf = new SparkConf(loadDefaults = true)
+    sparkConf.setAppName(UUID.randomUUID.toString)
+    sparkConf.setAll(config)
+
+    localSparkContext.set(new SparkContext(sparkConf))
+    localSparkSession.set(SparkSession.builder.getOrCreate())
+  }
+
+  override def beforeEach(): Unit = {
+    setupSparkContextAndSession()
+
+    // This isn't necessary for any test to work, but it suppresses log spam when loading segment
+    // metadata while reading data
+    val jacksonModules = Seq(new SketchModule)
+    MAPPER.registerModules(jacksonModules.flatMap(_.getJacksonModules.asScala.toList).asJava)
+    super.beforeEach()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+
+    sparkContext.stop()
+    FileUtils.deleteDirectory(new File(sparkContext.getConf.get("spark.local.dir")).getCanonicalFile)
+  }
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/clients/DruidMetadataClientSuite.scala b/spark/src/test/scala/org/apache/druid/spark/clients/DruidMetadataClientSuite.scala
new file mode 100644
index 0000000..2b37dbc
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/clients/DruidMetadataClientSuite.scala
@@ -0,0 +1,209 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.clients

+

+import org.apache.druid.java.util.common.StringUtils

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.Configuration

+import org.apache.druid.spark.mixins.TryWithResources

+import org.apache.druid.spark.v2.DruidDataSourceV2TestUtils

+import org.apache.druid.timeline.DataSegment

+import org.apache.druid.timeline.partition.NumberedShardSpec

+import org.scalatest.BeforeAndAfterEach

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+import scala.collection.JavaConverters.{asScalaBufferConverter, collectionAsScalaIterableConverter,

+  mapAsJavaMapConverter, seqAsJavaListConverter}

+

+class DruidMetadataClientSuite extends AnyFunSuite with Matchers with DruidDataSourceV2TestUtils

+  with BeforeAndAfterEach with TryWithResources {

+  private var uri: String = _

+

+  private val differentDataSourceSegment: DataSegment = new DataSegment(

+    "differentDataSource",

+    interval,

+    version,

+    loadSpec(makePath(segmentsDir.getCanonicalPath, firstSegmentPath)),

+    dimensions,

+    metrics,

+    new NumberedShardSpec(0, 0),

+    binaryVersion,

+    3278L

+  )

+  private val differentDataSourceSegmentString: String = MAPPER.writeValueAsString(differentDataSourceSegment)

+

+  test("getSegmentPayloads should retrieve selected DataSegment payloads from the metadata store") {

+    val metadataClient = DruidMetadataClient(Configuration(metadataClientProps(uri)))

+    // Need to exercise the underlying connector to create the metadata SQL tables in the test DB

+    metadataClient.checkIfDataSourceExists(dataSource)

+

+    tryWithResources(openDbiToTestDb(uri)) {

+      handle =>

+        val updateSql = """

+        |INSERT INTO druid_segments(

+        |  id, datasource, created_date, start, \"end\", partitioned, version, used, payload

+        |) VALUES

+        |(:id, :datasource, :created_date, :start, :end, :partitioned, :version, :used, :payload)

+        |""".stripMargin

+

+        val argsMap = Seq[Map[String, Any]](

+          Map[String, Any](

+            "id" -> firstSegment.getId.toString,

+            "datasource" -> dataSource,

+            "created_date" -> "2020-01-01T00:00:000Z",

+            "start" -> "2020-01-01T00:00:00.000Z",

+            "end" -> "2020-01-02T00:00:00.000Z",

+            "partitioned" -> true,

+            "version" -> version,

+            "used" -> true,

+            "payload" -> firstSegmentString.getBytes(StringUtils.UTF8_STRING)

+          ),

+          Map[String, Any](

+            "id" -> firstSegment.withVersion("test").getId.toString,

+            "datasource" -> dataSource,

+            "created_date" -> "2020-01-01T00:00:000Z",

+            "start" -> "2020-01-01T00:00:00.000Z",

+            "end" -> "2020-01-02T00:00:00.000Z",

+            "partitioned" -> true,

+            "version" -> version,

+            "used" -> false,

+            "payload" -> firstSegmentString.getBytes(StringUtils.UTF8_STRING)

+          ),

+          Map[String, Any](

+            "id" -> differentDataSourceSegment.getId.toString,

+            "datasource" -> "differentDataSource",

+            "created_date" -> "2020-01-01T00:00:000Z",

+            "start" -> "2020-01-01T00:00:00.000Z",

+            "end" -> "2020-01-02T00:00:00.000Z",

+            "partitioned" -> true,

+            "version" -> version,

+            "used" -> true,

+            "payload" -> differentDataSourceSegmentString.getBytes(StringUtils.UTF8_STRING)

+          ),

+          Map[String, Any](

+            "id" -> thirdSegment.getId.toString,

+            "datasource" -> dataSource,

+            "created_date" -> "2020-01-01T00:00:000Z",

+            "start" -> "2020-01-02T00:00:00.000Z",

+            "end" -> "2020-01-03T00:00:00.000Z",

+            "partitioned" -> true,

+            "version" -> version,

+            "used" -> true,

+            "payload" -> thirdSegmentString.getBytes(StringUtils.UTF8_STRING)

+          )

+        )

+

+        argsMap.foreach{argMap =>

+          val statement = handle.createStatement(updateSql).bindFromMap(argMap.asJava)

+          statement.execute()

+        }

+    }

+    val usedSegments = metadataClient.getSegmentPayloads(dataSource, None, None)

+    // Interval is 2020-01-01T00:00:00.000Z/2020-01-02T00:00:00.000Z

+    val segmentsByDate =

+      metadataClient.getSegmentPayloads(dataSource, Some(1577836800000L), Some(1577923200000L))

+

+    val expectedUsedSegments = Seq[DataSegment](firstSegment, thirdSegment)

+    val expectedSegmentsByDate = Seq[DataSegment](firstSegment)

+

+    usedSegments should contain theSameElementsInOrderAs expectedUsedSegments

+    segmentsByDate should contain theSameElementsInOrderAs expectedSegmentsByDate

+  }

+

+  test("checkIfDataSourceExists should return true iff the specified dataSource exists") {

+    val metadataClient = DruidMetadataClient(Configuration(metadataClientProps(uri)))

+    // Need to exercise the underlying connector to create the metadata SQL tables in the test DB

+    metadataClient.checkIfDataSourceExists(dataSource)

+

+    tryWithResources(openDbiToTestDb(uri)) {

+      handle =>

+        val updateSql = """

+        |INSERT INTO druid_segments(

+        |  id, datasource, created_date, start, \"end\", partitioned, version, used, payload

+        |) VALUES

+        |(:id, :datasource, :created_date, :start, :end, :partitioned, :version, :used, :payload)

+        |""".stripMargin

+

+        val argsMap = Map[String, Any](

+          "id" -> firstSegment.getId.toString,

+          "datasource" -> dataSource,

+          "created_date" -> "2020-01-01T00:00:000Z",

+          "start" -> "2020-01-01T00:00:00.000Z",

+          "end" -> "2020-01-02T00:00:00.000Z",

+          "partitioned" -> true,

+          "version" -> version,

+          "used" -> true,

+          "payload" -> firstSegmentString.getBytes(StringUtils.UTF8_STRING)

+        )

+        val statement = handle.createStatement(updateSql).bindFromMap(argsMap.asJava)

+        statement.execute()

+    }

+    metadataClient.checkIfDataSourceExists(dataSource) should be(true)

+    metadataClient.checkIfDataSourceExists("differentDataSource") should be(false)

+  }

+

+  test("publishSegments") {

+    val metadataClient = DruidMetadataClient(Configuration(metadataClientProps(uri)))

+    metadataClient.publishSegments(List(firstSegment, thirdSegment).asJava)

+

+    tryWithResources(openDbiToTestDb(uri)) {

+      handle =>

+        val res =

+          handle.createQuery("SELECT DATASOURCE, START, \"end\", PARTITIONED, VERSION, USED FROM druid_segments")

+            .list().asScala.map(m => m.values().asScala.map(_.toString).toSeq)

+        val expected = Seq[Seq[String]](

+          Seq(

+            dataSource,

+            "2020-01-01T00:00:00.000Z",

+            "2020-01-02T00:00:00.000Z",

+            "true",

+            version,

+            "true"

+          ), Seq(

+            dataSource,

+            "2020-01-02T00:00:00.000Z",

+            "2020-01-03T00:00:00.000Z",

+            "true",

+            version,

+            "true"

+          )

+        )

+        expected.size should equal(res.size)

+        expected.zipWithIndex.foreach{

+          // The results from the query are stored in an unordered map, so we can't rely on a simple should equal

+          case (s, index) => s should contain theSameElementsAs res(index)

+        }

+    }

+  }

+

+  override def beforeEach(): Unit = {

+    uri = generateUniqueTestUri()

+    createTestDb(uri)

+    registerEmbeddedDerbySQLConnector()

+    super.beforeEach()

+  }

+

+  override def afterEach(): Unit = {

+    tearDownTestDb(uri)

+    cleanUpWorkingDirectory()

+    super.afterEach()

+  }

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpersSuite.scala b/spark/src/test/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpersSuite.scala
new file mode 100644
index 0000000..b6906dc
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpersSuite.scala
@@ -0,0 +1,189 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+import java.io.File

+

+class DeepStorageConstructorHelpersSuite extends AnyFunSuite with Matchers {

+  private val sampleLocalConf: Configuration = Configuration(Map[String, String](

+    "deepStorageType" -> "local",

+    "local.storageDirectory" -> "/tmp/working/directory"

+  ))

+

+  private val sampleHdfsConf: Configuration = Configuration(Map[String, String](

+    "deepStorageType" -> "hdfs",

+    "hdfs.storageDirectory" -> "/tmp/working/directory"

+  ))

+

+  private val sampleS3Conf: Configuration = Configuration(Map[String, String](

+    "deepStorageType" -> "s3",

+    "s3.bucket" -> "testBucket",

+    "s3.baseKey" -> "prefix/to/druid",

+    "s3.disableAcl" -> "true",

+    "s3.maxListingLength" -> "999",

+    "s3.useS3aSchema" -> "false",

+    "s3.client.protocol" -> "https",

+    "s3.accessKey" -> "my access key",

+    "s3.secretKey" -> "my secret key",

+    "s3.proxy.host" -> "proxy.host",

+    "s3.proxy.port" -> "1234",

+    "s3.proxy.username" -> "druid",

+    "s3.proxy.password" -> "swordfish",

+    "s3.client.disableChunkedEncoding" -> "true",

+    "s3.endpoint.signingRegion" -> "us-west-1",

+    "s3.sse.type" -> "kms",

+    "s3.sse.kms.keyId" -> "key"

+  ))

+

+  private val sampleGoogleConfig: Configuration = Configuration(Map[String, String](

+    "deepStorageType" -> "google",

+    "google.bucket" -> "testBucket",

+    "google.prefix" -> "prefix/to/druid",

+    "google.maxListingLength" -> "1023"

+  ))

+

+  private val sampleAzureConfig: Configuration = Configuration(Map[String, String](

+    "deepStorageType" -> "azure",

+    "azure.account" -> "testAccount",

+    "azure.key" -> "12345ABCDEF",

+    "azure.container" -> "testContainer",

+    "azure.prefix" -> "prefix/to/druid",

+    "azure.maxListingLength" -> "1001"

+  ))

+

+  test("createLocalDataSegmentPusherConfig should construct a LocalDataSegmentPusherConfig") {

+    val pusherConfig =DeepStorageConstructorHelpers.createLocalDataSegmentPusherConfig(

+      sampleLocalConf.dive(DruidConfigurationKeys.localDeepStorageTypeKey)

+    )

+    pusherConfig.getStorageDirectory should equal(new File("/tmp/working/directory"))

+  }

+

+  test("createHdfsDataSegmentPusherConfig should construct an HdfsDataSegmentPusherConfig") {

+    val pusherConfig = DeepStorageConstructorHelpers.createHdfsDataSegmentPusherConfig(

+      sampleHdfsConf.dive(DruidConfigurationKeys.hdfsDeepStorageTypeKey)

+    )

+    pusherConfig.getStorageDirectory should equal("/tmp/working/directory")

+  }

+

+  test("createS3DataSegmentPusherConfig should construct an S3DataSegmentPusherConfig from a Configuration") {

+    val pusherConfig = DeepStorageConstructorHelpers.createS3DataSegmentPusherConfig(

+      sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey)

+    )

+

+    pusherConfig.getBucket should equal("testBucket")

+    pusherConfig.getBaseKey should equal("prefix/to/druid")

+    pusherConfig.getDisableAcl should be(true)

+    pusherConfig.getMaxListingLength should equal(999)

+    pusherConfig.isUseS3aSchema should be(false)

+

+    val prunedMap = sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey).toMap - "uses3aschema" - "disableacl"

+    val prunedConf = DeepStorageConstructorHelpers.createS3DataSegmentPusherConfig(Configuration(prunedMap))

+    prunedConf.getDisableAcl should be(false)

+    prunedConf.isUseS3aSchema should be(true)

+  }

+

+  test("createS3InputDataConfig should construct an S3InputDataConfig from a Configuration") {

+    val inputDataConfig = DeepStorageConstructorHelpers.createS3InputDataConfig(

+      sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey)

+    )

+

+    inputDataConfig.getMaxListingLength should equal(999)

+  }

+

+  test("prepareServerSideEncryptingAmazonS3 should correctly parse a Configuration") {

+    val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, _) =

+      DeepStorageConstructorHelpers.createConfigsForServerSideEncryptingAmazonS3(

+        sampleS3Conf.dive(DruidConfigurationKeys.s3DeepStorageTypeKey)

+      )

+

+    credentialsConfig.getAccessKey.getPassword should equal("my access key")

+    proxyConfig.getPort should equal(1234)

+    endpointConfig.getUrl should equal(null) // scalastyle:ignore null

+    clientConfig.getProtocol should equal("https")

+  }

+

+  test("createS3StorageConfig should create objects of the correct type") {

+    val noopSSEConfig = Configuration(Map[String, String]())

+    val s3SSEConfig = Configuration(Map[String, String]("type" -> "s3"))

+    val kmsSSEConfig = Configuration(Map[String, String]("type" -> "kms", "keyId" -> "key"))

+    val customSSEConfig = Configuration(Map[String, String](

+      "type" -> "custom",

+      "custom.base64EncodedKey" -> "0123456789abcdef"

+    ))

+

+    val noopStorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(noopSSEConfig)

+    val s3StorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(s3SSEConfig)

+    val kmsStorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(kmsSSEConfig)

+    val customStorageConfig = DeepStorageConstructorHelpers.createS3StorageConfig(customSSEConfig)

+

+    // Just confirming the class names because the class is package-private and so not visible here.

+    noopStorageConfig.getServerSideEncryption.getClass.getName should

+      equal("org.apache.druid.storage.s3.NoopServerSideEncryption")

+    s3StorageConfig.getServerSideEncryption.getClass.getName should

+      equal("org.apache.druid.storage.s3.S3ServerSideEncryption")

+    kmsStorageConfig.getServerSideEncryption.getClass.getName should

+      equal("org.apache.druid.storage.s3.KmsServerSideEncryption")

+    customStorageConfig.getServerSideEncryption.getClass.getName should

+      equal("org.apache.druid.storage.s3.CustomServerSideEncryption")

+  }

+

+  test("createGoogleAcountConfig should correctly parse a Configuration") {

+    val accountConfig = DeepStorageConstructorHelpers.createGoogleAcountConfig(

+      sampleGoogleConfig.dive(DruidConfigurationKeys.googleDeepStorageTypeKey)

+    )

+    accountConfig.getBucket should equal("testBucket")

+    accountConfig.getPrefix should equal("prefix/to/druid")

+  }

+

+  test("createGoogleInputDataConfig should correctly parse a Configuration") {

+    val inputDataConfig = DeepStorageConstructorHelpers.createGoogleInputDataConfig(

+      sampleGoogleConfig.dive(DruidConfigurationKeys.googleDeepStorageTypeKey)

+    )

+    inputDataConfig.getMaxListingLength should equal(1023)

+  }

+

+  test("createAzureDataSegmentConfig should correctly parse a Configuration") {

+    val dataSegmentConfig = DeepStorageConstructorHelpers.createAzureDataSegmentConfig(

+      sampleAzureConfig.dive(DruidConfigurationKeys.azureDeepStorageTypeKey)

+    )

+    dataSegmentConfig.getContainer should equal("testContainer")

+    dataSegmentConfig.getPrefix should equal("prefix/to/druid")

+  }

+

+  test("createAzureInputDataConfig should correctly parse a Configuration") {

+    val inputDataConfig = DeepStorageConstructorHelpers.createAzureInputDataConfig(

+      sampleAzureConfig.dive(DruidConfigurationKeys.azureDeepStorageTypeKey)

+    )

+    inputDataConfig.getMaxListingLength should equal(1001)

+  }

+

+  test("createAzureAccountConfig should correctly parse a Configuration") {

+    val accountConfig = DeepStorageConstructorHelpers.createAzureAccountConfig(

+      sampleAzureConfig.dive(DruidConfigurationKeys.azureDeepStorageTypeKey)

+    )

+    accountConfig.getKey should equal("12345ABCDEF")

+    accountConfig.getAccount should equal("testAccount")

+    accountConfig.getProtocol should equal("https")

+  }

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/utils/FilterUtilsSuite.scala b/spark/src/test/scala/org/apache/druid/spark/utils/FilterUtilsSuite.scala
new file mode 100644
index 0000000..cc6a23c
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/utils/FilterUtilsSuite.scala
@@ -0,0 +1,287 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.query.filter.{AndDimFilter, BoundDimFilter, DimFilter, FalseDimFilter,

+  InDimFilter, NotDimFilter, RegexDimFilter, SelectorDimFilter}

+import org.apache.druid.query.ordering.StringComparators

+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, GreaterThan, GreaterThanOrEqual,

+  In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith,

+  StringStartsWith, Filter => SparkFilter}

+import org.apache.spark.sql.types.{BinaryType, LongType, StringType, StructField, StructType}

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+import scala.collection.JavaConverters.{asScalaSetConverter, seqAsJavaListConverter, setAsJavaSetConverter}

+

+class FilterUtilsSuite extends AnyFunSuite with Matchers {

+  private val testSchema = StructType(Seq[StructField](

+    StructField("count", LongType),

+    StructField("name", StringType)

+  ))

+

+  test("mapFilters should convert a Spark filter into an equivalent Druid filter") {

+    val testSchema = StructType(Seq[StructField](

+      StructField("count", LongType)

+    ))

+

+    val druidFilter = FilterUtils.mapFilters(Array[SparkFilter](GreaterThan("count", 5)), testSchema).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("count")

+

+    // scalastyle:off null

+    val expected = new BoundDimFilter(

+      "count",

+      "5",

+      null,

+      true,

+      null,

+      null,

+      null,

+      StringComparators.NUMERIC,

+      null)

+    // scalastyle:on

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should map multiple Spark filters into an equivalent Druid filter") {

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](GreaterThan("count", 5), LessThanOrEqual("name", "foo")),

+      testSchema

+    ).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("count", "name")

+

+    // scalastyle:off null

+    val expected = new AndDimFilter(

+      List[DimFilter](

+        new BoundDimFilter(

+          "count",

+          "5",

+          null,

+          true,

+          null,

+          null,

+          null,

+          StringComparators.NUMERIC,

+          null),

+        new BoundDimFilter(

+          "name",

+          null,

+          "foo",

+          null,

+          false,

+          null,

+          null,

+          StringComparators.LEXICOGRAPHIC,

+          null)

+      ).asJava

+    )

+    // scalastyle:on

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should map a complex Spark filter into an equivalent Druid filter") {

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](And(GreaterThan("count", 5), StringStartsWith("name", "abc"))),

+      testSchema

+    ).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("count", "name")

+

+    // scalastyle:off null

+

+    val expected = new AndDimFilter(

+      List[DimFilter](

+        new BoundDimFilter(

+          "count",

+          "5",

+          null,

+          true,

+          null,

+          null,

+          null,

+          StringComparators.NUMERIC,

+          null),

+        new RegexDimFilter("name", "^abc", null, null)

+      ).asJava

+    )

+    // scalastyle:on

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should correctly map a Spark IsNull filter into an equivalent Druid filter") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](IsNull("name")), testSchema

+    ).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("name")

+

+    // scalastyle:off null

+    val expected = new SelectorDimFilter("name", null, null, null)

+    // scalastyle:on

+

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should correctly map a Spark IsNotNull filter into an equivalent Druid filter") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](IsNotNull("name")), testSchema

+    ).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("name")

+

+    // scalastyle:off null

+    val expected = new NotDimFilter(

+      new SelectorDimFilter("name", null, null, null)

+    )

+    // scalastyle:on

+

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should correctly map a Spark In filter with null into an equivalent Druid filter") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](In("name", Array("a", "b", null))), testSchema // scalastyle:ignore null

+    ).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("name")

+

+    // scalastyle:off null

+    val expected = new InDimFilter("name", Set[String]("a", "b").asJava, null, null)

+    // scalastyle:on

+

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should correctly map a Spark In filter with only null into a short-circuit filter") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](In("name", Array(null))), testSchema // scalastyle:ignore null

+    ).get

+

+    FalseDimFilter.instance() should equal(druidFilter)

+  }

+

+  test("mapFilters should correctly map a Spark EqualNullSafe null filter into an equivalent Druid filter") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](EqualNullSafe("name", null)), testSchema // scalastyle:ignore null

+    ).get

+    druidFilter.getRequiredColumns.asScala should contain theSameElementsAs Seq("name")

+

+    // scalastyle:off null

+    val expected = new SelectorDimFilter("name", null, null, null)

+    // scalastyle:on null

+

+    expected should equal(druidFilter)

+  }

+

+  test("mapFilters should correctly map a Spark EqualTo null filter into a short-circuit filter") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+

+    val druidFilter = FilterUtils.mapFilters(

+      Array[SparkFilter](EqualTo("name", null)), testSchema // scalastyle:ignore null

+    ).get

+

+    FalseDimFilter.instance() should equal(druidFilter)

+  }

+

+  test("isSupportedFilter should correctly identify supported and unsupported filters") {

+    FilterUtils.isSupportedFilter(And(EqualTo("count", 1), LessThan("name", "abc")), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(And(EqualTo("count", 1), IsNull("name")), testSchema) shouldBe false

+

+    FilterUtils.isSupportedFilter(Or(EqualTo("count", 1), LessThan("name", "abc")), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(Or(EqualTo("count", 1), IsNull("name")), testSchema) shouldBe false

+

+    FilterUtils.isSupportedFilter(Not(GreaterThan("count", 5)), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(Not(IsNull("count")), testSchema) shouldBe false

+

+    FilterUtils.isSupportedFilter(IsNull("count"), testSchema) shouldBe false

+    FilterUtils.isSupportedFilter(IsNotNull("count"), testSchema) shouldBe false

+

+    FilterUtils.isSupportedFilter(In("name", Array[Any]("foo", "bar")), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(In("count", Array[Any](null)), testSchema) shouldBe false // scalastyle:ignore null

+

+    FilterUtils.isSupportedFilter(StringContains("count", "foo"), testSchema) shouldBe false

+    FilterUtils.isSupportedFilter(StringContains("name", "foo"), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(StringStartsWith("name", "foo"), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(StringEndsWith("name", "foo"), testSchema) shouldBe true

+

+    FilterUtils.isSupportedFilter(EqualTo("count", 5), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(EqualTo("name", null), testSchema) shouldBe false // scalastyle:ignore null

+    FilterUtils.isSupportedFilter(EqualNullSafe("count", 5), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(EqualNullSafe("name", null), testSchema) shouldBe false // scalastyle:ignore null

+

+    FilterUtils.isSupportedFilter(LessThan("name", "foo"), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(LessThanOrEqual("count", 17), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(GreaterThan("name", "bar"), testSchema) shouldBe true

+    FilterUtils.isSupportedFilter(GreaterThanOrEqual("count", -8), testSchema) shouldBe true

+  }

+

+  test("isSupportedFilter should correctly identify IsNull and IsNotNull filters as supported when using " +

+    "SQL-compatible null handling") {

+    FilterUtils.isSupportedFilter(And(EqualTo("count", 1), IsNull("name")), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(Or(EqualTo("count", 1), IsNull("name")), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(Not(IsNull("count")), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(IsNull("count"), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(IsNotNull("count"), testSchema, true) shouldBe true

+  }

+

+  test("isSupportedFilter should correctly identify In, EqualNullSafe, and EqualTo filters on null as " +

+    "supported when using SQL-compatible null handling") {

+    // scalastyle:off null

+    FilterUtils.isSupportedFilter(In("name", Array[Any]("a", "b", null)), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(In("name", Array[Any](null)), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(EqualNullSafe("name", null), testSchema, true) shouldBe true

+    FilterUtils.isSupportedFilter(EqualTo("name", null), testSchema, true) shouldBe true

+    // scalastyle:on

+  }

+

+  test("isSupportedFilter should correctly identify filters that are unsupported due to the data type of " +

+    "the column they're filtering") {

+    val schema = StructType(Seq[StructField](

+      StructField("count", LongType),

+      StructField("name", StringType),

+      StructField("complex_field", BinaryType)

+    ))

+

+    FilterUtils.isSupportedFilter(GreaterThan("complex_field", 5), schema) shouldBe false

+    FilterUtils.isSupportedFilter(StringStartsWith("count", "any"), schema) shouldBe false

+  }

+

+  test("getTimeFilterBounds should handle upper and lower bounds") {

+    val expected = (Some(2501L), Some(5000L))

+    val filters = Array[SparkFilter](LessThanOrEqual("__time", 5000L), GreaterThan("__time", 2500L))

+

+    val actual = FilterUtils.getTimeFilterBounds(filters)

+    actual should equal(expected)

+  }

+

+  test("getTimeFilterBounds should handle empty or multiple filters for a bound") {

+    val expected = (None, Some(2499L))

+    val filters = Array[SparkFilter](LessThanOrEqual("__time", 5000L), LessThan("__time", 2500L))

+

+    val actual =  FilterUtils.getTimeFilterBounds(filters)

+    actual should equal(expected)

+  }

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/utils/NullHandlingUtilsSuite.scala b/spark/src/test/scala/org/apache/druid/spark/utils/NullHandlingUtilsSuite.scala
new file mode 100644
index 0000000..32f54e6
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/utils/NullHandlingUtilsSuite.scala
@@ -0,0 +1,36 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.common.config.NullHandling

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+class NullHandlingUtilsSuite extends AnyFunSuite with Matchers {

+  test("NullHandlingUtils.initializeDruidNullHandling(true) should use default values for null") {

+    NullHandlingUtils.initializeDruidNullHandling(true)

+    NullHandling.replaceWithDefault() shouldBe true

+  }

+

+  test("NullHandlingUtils.initializeDruidNullHandling(false) should use sql-compatible null handling") {

+    NullHandlingUtils.initializeDruidNullHandling(false)

+    NullHandling.sqlCompatible() shouldBe true

+  }

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/utils/SchemaUtilsSuite.scala b/spark/src/test/scala/org/apache/druid/spark/utils/SchemaUtilsSuite.scala
new file mode 100644
index 0000000..60a7608
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/utils/SchemaUtilsSuite.scala
@@ -0,0 +1,246 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.utils

+

+import org.apache.druid.data.input.MapBasedInputRow

+import org.apache.druid.data.input.impl.{DimensionSchema, DoubleDimensionSchema,

+  FloatDimensionSchema, LongDimensionSchema, StringDimensionSchema}

+import org.apache.druid.java.util.common.IAE

+import org.apache.druid.spark.registries.ComplexTypeRegistry

+import org.apache.druid.spark.v2.DruidDataSourceV2TestUtils

+import org.apache.spark.sql.catalyst.InternalRow

+import org.apache.spark.sql.catalyst.util.ArrayData

+import org.apache.spark.sql.types.{ArrayType, BinaryType, DoubleType, FloatType, IntegerType,

+  LongType, StringType, StructField, StructType, TimestampType}

+import org.apache.spark.unsafe.types.UTF8String

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter,

+  seqAsJavaListConverter}

+

+class SchemaUtilsSuite extends AnyFunSuite with Matchers with DruidDataSourceV2TestUtils {

+  private val expectedBaseDimensions = Seq[DimensionSchema](

+    new StringDimensionSchema("dim1"),

+    new StringDimensionSchema("dim2"),

+    new StringDimensionSchema("id1"),

+    new StringDimensionSchema("id2")

+  )

+

+  test("convertDruidSchemaToSparkSchema should convert a Druid schema") {

+    val columnMap = Map[String, (String, Boolean)](

+      "__time" -> ("LONG", false),

+      "dim1" -> ("STRING", true),

+      "dim2" -> ("STRING", false),

+      "id1" -> ("STRING", false),

+      "id2" -> ("STRING", false),

+      "count" -> ("LONG", false),

+      "sum_metric1" -> ("LONG", false),

+      "sum_metric2" -> ("LONG", false),

+      "sum_metric3" -> ("DOUBLE", false),

+      "sum_metric4" -> ("FLOAT", false),

+      "uniq_id1" -> ("thetaSketch", false)

+    )

+

+    ComplexTypeRegistry.registerByName("thetaSketch", false)

+    val actualSchema = SchemaUtils.convertDruidSchemaToSparkSchema(columnMap)

+    actualSchema.fields should contain theSameElementsAs schema.fields

+  }

+

+  test("convertInputRowToSparkRow should convert an InputRow to an InternalRow") {

+    val timestamp = 0L

+    val dimensions = List[String]("dim1", "dim2", "dim3", "dim4", "dim5", "dim6", "dim7", "dim8")

+    val event = Map[String, Any](

+      "dim1" -> 1L,

+      "dim2" -> "false",

+      "dim3" -> "str",

+      "dim4" -> List[String]("val1", "val2").asJava,

+      "dim5" -> 4.2,

+      "dim6" -> List[Long](12L, 26L).asJava,

+      "dim7" -> "12345",

+      "dim8" -> "12"

+    ).map{case(k, v) =>

+      // Gotta do our own auto-boxing in Scala

+      k -> v.asInstanceOf[AnyRef]

+    }

+    val inputRow: MapBasedInputRow =

+      new MapBasedInputRow(timestamp, dimensions.asJava, event.asJava)

+    val schema = StructType(Seq(

+      StructField("__time", TimestampType),

+      StructField("dim1", LongType),

+      StructField("dim2", StringType),

+      StructField("dim3", StringType),

+      StructField("dim4", ArrayType(StringType, false)),

+      StructField("dim5", DoubleType),

+      StructField("dim6", ArrayType(LongType, false)),

+      StructField("dim7", LongType),

+      StructField("dim8", ArrayType(LongType, false))

+    ))

+    val res = SchemaUtils.convertInputRowToSparkRow(inputRow, schema, false)

+    val expected = InternalRow.fromSeq(

+      Seq(

+        0,

+        1L,

+        UTF8String.fromString("false"),

+        UTF8String.fromString("str"),

+        ArrayData.toArrayData(Array(UTF8String.fromString("val1"), UTF8String.fromString("val2"))),

+        4.2,

+        ArrayData.toArrayData(Seq(12L, 26L)),

+        12345,

+        ArrayData.toArrayData(Seq(12L))

+      )

+    )

+    for (i <- 0 until schema.length) {

+      res.get(i, schema(i).dataType) should equal(expected.get(i, schema(i).dataType))

+    }

+  }

+

+  test("convertInputRowToSparkRow should return null for missing dimensions") {

+    val timestamp = 0L

+    val dimensions = List[String]("dim1", "dim2", "dim3", "dim4", "dim5")

+    val event = Map[String, Any](

+      "dim1" -> 1L,

+      "dim2" -> "false",

+      "dim3" -> "str"

+    ).map{case(k, v) =>

+      // Gotta do our own auto-boxing in Scala

+      k -> v.asInstanceOf[AnyRef]

+    }

+    val inputRow: MapBasedInputRow =

+      new MapBasedInputRow(timestamp, dimensions.asJava, event.asJava)

+    val schema = StructType(Seq(

+      StructField("__time", TimestampType),

+      StructField("dim1", LongType),

+      StructField("dim2", StringType),

+      StructField("dim3", StringType),

+      StructField("dim4", ArrayType(StringType, false)),

+      StructField("dim5", LongType)

+    ))

+    val defaultNullHandlingRes = SchemaUtils.convertInputRowToSparkRow(inputRow, schema, true)

+    val defaultNullExpected = InternalRow.fromSeq(

+      Seq(

+        0,

+        1L,

+        UTF8String.fromString("false"),

+        UTF8String.fromString("str"),

+        UTF8String.fromString(""),

+        0

+      )

+    )

+    for (i <- 0 until schema.length) {

+      defaultNullHandlingRes.get(i, schema(i).dataType) should equal(defaultNullExpected.get(i, schema(i).dataType))

+    }

+

+

+    val sqlNullHandlingRes = SchemaUtils.convertInputRowToSparkRow(inputRow, schema, false)

+    val sqlNullExpected = InternalRow.fromSeq(

+      Seq(

+        0,

+        1L,

+        UTF8String.fromString("false"),

+        UTF8String.fromString("str"),

+        null, // scalastyle:ignore null

+        null // scalastyle:ignore null

+      )

+    )

+    for (i <- 0 until schema.length) {

+      sqlNullHandlingRes.get(i, schema(i).dataType) should equal(sqlNullExpected.get(i, schema(i).dataType))

+    }

+  }

+

+

+  test("convertStructTypeToDruidDimensionSchema should convert dimensions from a well-formed StructType") {

+    val updatedSchema = schema

+      .add("id3", LongType)

+      .add("dim3", FloatType)

+      .add("dim4", DoubleType)

+      .add("dim5", IntegerType)

+    val updatedDimensions = dimensions.asScala ++ Seq("id3", "dim3", "dim4", "dim5")

+

+    val dimensionSchema =

+      SchemaUtils.convertStructTypeToDruidDimensionSchema(updatedDimensions, updatedSchema)

+

+    val expectedDimensions = expectedBaseDimensions ++ Seq[DimensionSchema](

+      new LongDimensionSchema("id3"),

+      new FloatDimensionSchema("dim3"),

+      new DoubleDimensionSchema("dim4"),

+      new LongDimensionSchema("dim5")

+    )

+

+    dimensionSchema should contain theSameElementsInOrderAs expectedDimensions

+  }

+

+  test("convertStructTypeToDruidDimensionSchema should only process schema fields specified in dimensions") {

+    val updatedSchema = schema

+      .add("id3", LongType)

+      .add("dim3", FloatType)

+      .add("dim4", DoubleType)

+      .add("dim5", IntegerType)

+      .add("bin1", BinaryType) // Incompatible types are ok in the schema if they aren't dimensions

+

+    val dimensionSchema =

+      SchemaUtils.convertStructTypeToDruidDimensionSchema(dimensions.asScala, updatedSchema)

+

+    dimensionSchema should contain theSameElementsInOrderAs expectedBaseDimensions

+  }

+

+  test("convertStructTypeToDruidDimensionSchema should error when incompatible Spark types are present") {

+    val updatedSchema = schema

+      .add("bin1", BinaryType)

+    val updatedDimensions = dimensions.asScala :+ "bin1"

+

+    an[IAE] should be thrownBy

+      SchemaUtils.convertStructTypeToDruidDimensionSchema(updatedDimensions, updatedSchema)

+  }

+

+  test("validateDimensionSpecAgainstSparkSchema should return true for valid dimension schemata") {

+    val dimensions = Seq[DimensionSchema](

+      new StringDimensionSchema("testStringDim"),

+      new LongDimensionSchema("testLongDim"),

+      new StringDimensionSchema("testStringDim2", DimensionSchema.MultiValueHandling.ARRAY, false),

+      new FloatDimensionSchema("testFloatDim"),

+      new DoubleDimensionSchema("testDoubleDim"),

+      new LongDimensionSchema("testLongDim2")

+    )

+

+    val schema = StructType(Seq[StructField](

+      StructField("testStringDim", ArrayType(StringType)),

+      StructField("testLongDim", LongType),

+      StructField("testStringDim2", StringType),

+      StructField("testFloatDim", FloatType),

+      StructField("tesDoubleDim", DoubleType),

+      StructField("testLongDim2", IntegerType)

+    ))

+

+    SchemaUtils.validateDimensionSpecAgainstSparkSchema(dimensions, schema) should be(true)

+  }

+

+  test("validateDimensionSpecAgainstSparkSchema should throw an IAE for an invalid set of dimensions") {

+    val dimensions = Seq[DimensionSchema](

+      new StringDimensionSchema("testStringDim")

+    )

+

+    val schema = StructType(Seq[StructField](

+      StructField("testStringDim", LongType)

+    ))

+

+    an[IAE] should be thrownBy SchemaUtils.validateDimensionSpecAgainstSparkSchema(dimensions, schema)

+  }

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/DruidDataSourceV2Suite.scala b/spark/src/test/scala/org/apache/druid/spark/v2/DruidDataSourceV2Suite.scala
new file mode 100644
index 0000000..fc6cee7
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/DruidDataSourceV2Suite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2
+
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.{MAPPER, SparkFunSuite}
+import org.apache.druid.spark.mixins.TryWithResources
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.{DataFrame, Row}
+import org.scalatest.matchers.should.Matchers
+
+import scala.collection.JavaConverters.seqAsJavaListConverter
+
+class DruidDataSourceV2Suite extends SparkFunSuite with Matchers
+  with DruidDataSourceV2TestUtils with TryWithResources {
+  test("sparkSession.read(\"druid\") should correctly read segments into a dataFrame") {
+    val expected = sparkSession.createDataFrame(Seq(
+      // Reading from segments will not sort the resulting dataframe by time across segments, only within it
+      Row.fromSeq(Seq(1577836800000L, List("dim1"), "1", "1", "2", 1L, 1L, 3L, 4.2, 1.7F, idOneSketch)),
+      Row.fromSeq(Seq(1577862000000L, List("dim2"), "1", "1", "2", 1L, 4L, 2L, 5.1, 8.9F, idOneSketch)),
+      Row.fromSeq(Seq(1577851200000L, List("dim1"), "1", "1", "2", 1L, 3L, 1L, 0.2, 0.0F, idOneSketch)),
+      Row.fromSeq(Seq(1577876400000L, List("dim2"), "2", "1", "2", 1L, 1L, 5L, 8.0, 4.15F, idOneSketch)),
+      Row.fromSeq(Seq(1577962800000L, List("dim1", "dim3"), "2", "3", "7", 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch)),
+      Row.fromSeq(Seq(1577988000000L, List("dim2"), "3", "2", "1", 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch))
+    ).asJava, schema)
+
+    val segmentsString = MAPPER.writeValueAsString(
+      List[DataSegment](firstSegment, secondSegment, thirdSegment).asJava
+    )
+
+    val df = sparkSession
+      .read
+      .format("druid")
+      .options(Map(
+        s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
+        s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.useSparkConfForDeepStorageKey}" -> "true"
+      ))
+      .schema(schema)
+      .load()
+
+    matchDfs(df, expected)
+  }
+
+  /**
+    * Match two DataFrames, DF and EXPECTED.
+    *
+    * @param df The result DataFrame to match against EXPECTED.
+    * @param expected The expected DataFrame.
+    */
+  private def matchDfs(df: DataFrame, expected: DataFrame): Unit = {
+    df.schema should equal(expected.schema)
+
+    df.collect().map{row =>
+      row.toSeq.map {
+        case v: Array[Byte] => StringUtils.encodeBase64String(v)
+        case x: Any => x
+      }
+    }.zip(expected.collect().map{row =>
+      row.toSeq.map {
+        case v: Array[Byte] => StringUtils.encodeBase64String(v)
+        case x: Any => x
+      }
+    }).map(row => row._1 should contain theSameElementsAs row._2)
+  }
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/DruidDataSourceV2TestUtils.scala b/spark/src/test/scala/org/apache/druid/spark/v2/DruidDataSourceV2TestUtils.scala
new file mode 100644
index 0000000..a746a82
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/DruidDataSourceV2TestUtils.scala
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2
+
+import com.google.common.base.{Supplier, Suppliers}
+import org.apache.commons.dbcp2.BasicDataSource
+import org.apache.druid.java.util.common.granularity.GranularityType
+import org.apache.druid.java.util.common.{FileUtils, Intervals, StringUtils}
+import org.apache.druid.metadata.{MetadataStorageConnectorConfig, MetadataStorageTablesConfig,
+  SQLMetadataConnector}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.DruidConfigurationKeys
+import org.apache.druid.spark.registries.SQLConnectorRegistry
+import org.apache.druid.spark.utils.SchemaUtils
+import org.apache.druid.timeline.DataSegment
+import org.apache.druid.timeline.partition.NumberedShardSpec
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, BinaryType, DoubleType, FloatType, LongType,
+  StringType, StructField, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.unsafe.types.UTF8String
+import org.joda.time.Interval
+import org.skife.jdbi.v2.{DBI, Handle}
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException
+
+import java.io.File
+import java.util.{Properties, UUID, List => JList, Map => JMap}
+import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsJavaMapConverter,
+  seqAsJavaListConverter}
+import scala.collection.mutable.ArrayBuffer
+
+trait DruidDataSourceV2TestUtils {
+  val dataSource: String = "spark_druid_test"
+  val interval: Interval = Intervals.of("2020-01-01T00:00:00.000Z/2020-01-02T00:00:00.000Z")
+  val secondInterval: Interval = Intervals.of("2020-01-02T00:00:00.000Z/2020-01-03T00:00:00.000Z")
+  val version: String = "0"
+  val segmentsDir: File =
+    new File(makePath("src", "test", "resources", "segments")).getCanonicalFile
+  val firstSegmentPath: String =
+    makePath("spark_druid_test", "2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z", "0", "0", "index.zip")
+  val secondSegmentPath: String =
+    makePath("spark_druid_test", "2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z", "0", "1", "index.zip")
+  val thirdSegmentPath: String =
+    makePath("spark_druid_test", "2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z", "0", "0", "index.zip")
+  val loadSpec: String => JMap[String, AnyRef] = (path: String) =>
+    Map[String, AnyRef]("type" -> "local", "path" -> path).asJava
+  val dimensions: JList[String] = List("dim1", "dim2", "id1", "id2").asJava
+  val metrics: JList[String] = List(
+    "count", "sum_metric1","sum_metric2","sum_metric3","sum_metric4","uniq_id1").asJava
+  val metricsSpec: String =
+    """[
+      |  { "type": "count", "name": "count" },
+      |  { "type": "longSum", "name": "sum_metric1", "fieldName": "sum_metric1" },
+      |  { "type": "longSum", "name": "sum_metric2", "fieldName": "sum_metric2" },
+      |  { "type": "doubleSum", "name": "sum_metric3", "fieldName": "sum_metric3" },
+      |  { "type": "floatSum", "name": "sum_metric4", "fieldName": "sum_metric4" },
+      |  { "type": "thetaSketch", "name": "uniq_id1", "fieldName": "uniq_id1", "isInputThetaSketch": true }
+      |]""".stripMargin
+  val binaryVersion: Integer = 9
+  val timestampColumn: String = "__time"
+  val timestampFormat: String = "auto"
+  val segmentGranularity: String = GranularityType.DAY.name
+
+  val firstSegment: DataSegment = new DataSegment(
+    dataSource,
+    interval,
+    version,
+    loadSpec(makePath(segmentsDir.getCanonicalPath, firstSegmentPath)),
+    dimensions,
+    metrics,
+    new NumberedShardSpec(0, 0),
+    binaryVersion,
+    3278L
+  )
+  val secondSegment: DataSegment = new DataSegment(
+    dataSource,
+    interval,
+    version,
+    loadSpec(makePath(segmentsDir.getCanonicalPath, secondSegmentPath)),
+    dimensions,
+    metrics,
+    new NumberedShardSpec(1, 0),
+    binaryVersion,
+    3299L
+  )
+  val thirdSegment: DataSegment = new DataSegment(
+    dataSource,
+    secondInterval,
+    version,
+    loadSpec(makePath(segmentsDir.getCanonicalPath, thirdSegmentPath)),
+    dimensions,
+    metrics,
+    new NumberedShardSpec(0, 0),
+    binaryVersion,
+    3409L
+  )
+
+  val firstSegmentString: String = MAPPER.writeValueAsString(firstSegment)
+  val secondSegmentString: String = MAPPER.writeValueAsString(secondSegment)
+  val thirdSegmentString: String = MAPPER.writeValueAsString(thirdSegment)
+
+  val idOneSketch: Array[Byte] = StringUtils.decodeBase64String("AQMDAAA6zJNV0wc7TCHDCQ==")
+  val idTwoSketch: Array[Byte] = StringUtils.decodeBase64String("AQMDAAA6zJNHlmybd5/laQ==")
+  val idThreeSketch: Array[Byte] = StringUtils.decodeBase64String("AQMDAAA6zJOppPrHQT61Dw==")
+
+  val firstTimeBucket: Long = 1577836800000L
+  val secondTimeBucket: Long = 1577923200000L
+
+  val schema: StructType = StructType(Seq[StructField](
+    StructField("__time", LongType),
+    StructField("dim1", ArrayType(StringType, false)),
+    StructField("dim2", StringType),
+    StructField("id1", StringType),
+    StructField("id2", StringType),
+    StructField("count", LongType),
+    StructField("sum_metric1", LongType),
+    StructField("sum_metric2", LongType),
+    StructField("sum_metric3", DoubleType),
+    StructField("sum_metric4", FloatType),
+    StructField("uniq_id1", BinaryType)
+  ))
+
+  val columnTypes: Option[Set[String]] =
+    Option(Set("LONG", "STRING", "FLOAT", "DOUBLE", "thetaSketch"))
+
+  private val tempDirs: ArrayBuffer[String] = new ArrayBuffer[String]()
+  def testWorkingStorageDirectory: String = {
+    val tempDir = FileUtils.createTempDir("druid-spark-tests").getCanonicalPath
+    tempDirs += tempDir
+    tempDir
+  }
+
+  private val testDbUri = "jdbc:derby:memory:TestDatabase"
+  def generateUniqueTestUri(): String = testDbUri + dbSafeUUID
+
+  val metadataClientProps: String => Map[String, String] = (uri: String) => Map[String, String](
+    s"${DruidConfigurationKeys.metadataPrefix}.${DruidConfigurationKeys.metadataDbTypeKey}" -> "embedded_derby",
+    s"${DruidConfigurationKeys.metadataPrefix}.${DruidConfigurationKeys.metadataConnectUriKey}" -> uri
+  )
+
+  def createTestDb(uri: String): Unit = new DBI(s"$uri;create=true").open().close()
+  def openDbiToTestDb(uri: String): Handle = new DBI(uri).open()
+  def tearDownTestDb(uri: String): Unit = {
+    try {
+      new DBI(s"$uri;shutdown=true").open().close()
+    } catch {
+      // Closing an in-memory Derby database throws an expected exception. It bubbles up as an
+      // UnableToObtainConnectionException from skiffie.
+      // TODO: Just open the connection directly and check the exception there
+      case _: UnableToObtainConnectionException =>
+    }}
+
+  def registerEmbeddedDerbySQLConnector(): Unit = {
+    SQLConnectorRegistry.register("embedded_derby",
+      (connectorConfigSupplier: Supplier[MetadataStorageConnectorConfig],
+       metadataTableConfigSupplier: Supplier[MetadataStorageTablesConfig]) => {
+        val connectorConfig = connectorConfigSupplier.get()
+        val amendedConnectorConfigSupplier =
+          new MetadataStorageConnectorConfig
+          {
+            override def isCreateTables: Boolean = true
+            override def getHost: String = connectorConfig.getHost
+            override def getPort: Int = connectorConfig.getPort
+            override def getConnectURI: String = connectorConfig.getConnectURI
+            override def getUser: String = connectorConfig.getUser
+            override def getPassword: String = connectorConfig.getPassword
+            override def getDbcpProperties: Properties = connectorConfig.getDbcpProperties
+          }
+
+        val res: SQLMetadataConnector =
+          new SQLMetadataConnector(Suppliers.ofInstance(amendedConnectorConfigSupplier), metadataTableConfigSupplier) {
+            val datasource: BasicDataSource = getDatasource
+            datasource.setDriverClassLoader(getClass.getClassLoader)
+            datasource.setDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
+            private val dbi = new DBI(connectorConfigSupplier.get().getConnectURI)
+            private val SERIAL_TYPE = "BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)"
+
+            override def getSerialType: String = SERIAL_TYPE
+
+            override def getStreamingFetchSize: Int = 1
+
+            override def getQuoteString: String = "\\\""
+
+            override def tableExists(handle: Handle, tableName: String): Boolean =
+              !handle.createQuery("select * from SYS.SYSTABLES where tablename = :tableName")
+                .bind("tableName", StringUtils.toUpperCase(tableName)).list.isEmpty;
+
+            override def getDBI: DBI = dbi
+          }
+        res.createSegmentTable()
+        res
+      })
+  }
+
+  def cleanUpWorkingDirectory(): Unit = {
+    tempDirs.foreach(dir => FileUtils.deleteDirectory(new File(dir).getCanonicalFile))
+  }
+
+  def partitionReaderToSeq(reader: InputPartitionReader[InternalRow]): Seq[InternalRow] = {
+    val res = new ArrayBuffer[InternalRow]()
+    while (reader.next()) {
+      res += reader.get()
+    }
+    reader.close()
+    res
+  }
+
+  def columnarPartitionReaderToSeq(reader: InputPartitionReader[ColumnarBatch]): Seq[InternalRow] = {
+    val res = new ArrayBuffer[InternalRow]()
+    // ColumnarBatches return MutableColumnarRows, so we need to copy them before we close
+    while (reader.next()) {
+      val batch = reader.get()
+      batch.rowIterator().asScala.foreach { row =>
+        // MutableColumnarRows don't support copying ArrayTypes, we can't use row.copy()
+        val finalizedRow = new GenericInternalRow(batch.numCols())
+        (0 until batch.numCols()).foreach{ col =>
+          if (row.isNullAt(col)) {
+            finalizedRow.setNullAt(col)
+          } else {
+            val dataType = batch.column(col).dataType()
+            dataType match {
+              case _: ArrayType =>
+                // Druid only supports multiple values for Strings, hard-code that assumption here for now
+                val finalizedArr = row.getArray(col).array.map(el => el.asInstanceOf[UTF8String].copy())
+                finalizedRow.update(col, ArrayData.toArrayData(finalizedArr))
+              case _ =>
+                finalizedRow.update(col, row.get(col, dataType))
+            }
+          }
+        }
+        res += finalizedRow
+      }
+    }
+    reader.close()
+    res
+  }
+
+  def wrapSeqToInternalRow(seq: Seq[Any], schema: StructType): InternalRow = {
+    InternalRow.fromSeq(seq.zipWithIndex.map{case (elem, i) =>
+      if (elem == null) { // scalastyle:ignore null
+        null // scalastyle:ignore null
+      } else {
+        schema(i).dataType match {
+          case _: ArrayType =>
+            val baseType = schema(i).dataType.asInstanceOf[ArrayType].elementType
+            elem match {
+              case collection: Traversable[_] =>
+                ArrayData.toArrayData(collection.map { elem =>
+                  SchemaUtils.parseToScala(elem, baseType)
+                })
+              case _ =>
+                // Single-element arrays
+                ArrayData.toArrayData(List(SchemaUtils.parseToScala(elem, baseType)))
+            }
+          case _ => SchemaUtils.parseToScala(elem, schema(i).dataType)
+        }
+      }
+    })
+  }
+
+  /**
+    * Given a DataFrame DF, return a collection of arrays of Rows where each array contains all rows for a
+    * partition in DF.
+    *
+    * @param df The dataframe to extract partitions from.
+    * @return A Seq[Array[Row]], where each Array[Row] contains all rows for a corresponding partition in DF.
+    */
+  def getDataFramePartitions(df: DataFrame): Seq[Array[Row]] = {
+    df
+      .rdd
+      .map(row => TaskContext.getPartitionId() -> row)
+      .collect()
+      .groupBy(_._1)
+      .values
+      .map(_.map(_._2))
+      .toSeq
+  }
+
+  def makePath(components: String*): String = {
+    components.mkString(File.separator)
+  }
+
+  def dbSafeUUID: String = StringUtils.removeChar(UUID.randomUUID.toString, '-')
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReaderSuite.scala b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReaderSuite.scala
new file mode 100644
index 0000000..e02c4e0
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReaderSuite.scala
@@ -0,0 +1,57 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.spark.SparkFunSuite

+import org.apache.druid.spark.configuration.SerializableHadoopConfiguration

+import org.apache.spark.sql.vectorized.ColumnarBatch

+

+class DruidColumnarInputPartitionReaderSuite extends SparkFunSuite with InputPartitionReaderBehaviors[ColumnarBatch] {

+

+  private val hadoopConf =

+    () => sparkContext.broadcast(new SerializableHadoopConfiguration(sparkContext.hadoopConfiguration))

+

+  testsFor(

+    inputPartitionReader(

+      "DruidColumnarInputPartitionReader with batch size 1",

+      hadoopConf,

+      new DruidColumnarInputPartitionReader(_, _, _, _, _, _, _, _, _, batchSize = 1),

+      columnarPartitionReaderToSeq

+    )

+  )

+

+  testsFor(

+    inputPartitionReader(

+      "DruidColumnarInputPartitionReader with batch size 2",

+      hadoopConf,

+      new DruidColumnarInputPartitionReader(_, _, _, _, _, _, _, _, _, batchSize = 2),

+      columnarPartitionReaderToSeq

+    )

+  )

+

+  testsFor(

+    inputPartitionReader(

+      "DruidColumnarInputPartitionReader with batch size 512",

+      hadoopConf,

+      new DruidColumnarInputPartitionReader(_, _, _, _, _, _, _, _, _, batchSize = 512),

+      columnarPartitionReaderToSeq

+    )

+  )

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReaderSuite.scala b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReaderSuite.scala
new file mode 100644
index 0000000..17f2673
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReaderSuite.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.spark.configuration.DruidConfigurationKeys
+import org.apache.druid.spark.v2.DruidDataSourceV2TestUtils
+import org.apache.druid.spark.{MAPPER, SparkFunSuite}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.{Filter, GreaterThanOrEqual, LessThan}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.scalatest.matchers.should.Matchers
+
+import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
+
+class DruidDataSourceReaderSuite extends SparkFunSuite with Matchers
+  with DruidDataSourceV2TestUtils {
+  private val segmentsString = MAPPER.writeValueAsString(
+    List[DataSegment](firstSegment, secondSegment, thirdSegment).asJava
+  )
+
+  private val defaultExpected = Seq(
+    Seq(1577836800000L, List("dim1"), 1, 1, 2, 1L, 1L, 3L, 4.2, 1.7F, idOneSketch),
+    Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch),
+    Seq(1577851200000L, List("dim1"), 1, 1, 2, 1L, 3L, 1L, 0.2, 0.0F, idOneSketch),
+    Seq(1577876400000L, List("dim2"), 2, 1, 2, 1L, 1L, 5L, 8.0, 4.15F, idOneSketch),
+    Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch),
+    Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch)
+  ).map(wrapSeqToInternalRow(_, schema))
+
+  test("DruidDataSourceReader should correctly read directly specified segments") {
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString
+    )
+
+    readSpecifiedSegments(dsoMap, false)
+  }
+
+  test("DruidDataSourceReader should correctly read directly specified segments with a filter") {
+    val expected = Seq(
+      Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch),
+      Seq(1577876400000L, List("dim2"), 2, 1, 2, 1L, 1L, 5L, 8.0, 4.15F, idOneSketch),
+      Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch),
+      Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch)
+    ).map(wrapSeqToInternalRow(_, schema))
+
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString
+    )
+
+    readSpecifiedSegments(dsoMap, false, expected, Option(Array[Filter](GreaterThanOrEqual("sum_metric4", 2L))))
+  }
+
+  test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true, " +
+    "batch size 1") {
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true",
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.batchSizeKey}" -> "1"
+    )
+
+    readSpecifiedSegments(dsoMap, true)
+  }
+
+  test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true, " +
+    "batch size 2") {
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true",
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.batchSizeKey}" -> "2"
+    )
+
+    readSpecifiedSegments(dsoMap, true)
+  }
+
+  test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true, " +
+    "default batch size") {
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true"
+    )
+
+    readSpecifiedSegments(dsoMap, true)
+  }
+
+  test("DruidDataSourceReader should correctly read directly specified segments with vectorize = true " +
+    "and a filter") {
+    val expected = Seq(
+      Seq(1577836800000L, List("dim1"), 1, 1, 2, 1L, 1L, 3L, 4.2, 1.7F, idOneSketch),
+      Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch),
+      Seq(1577851200000L, List("dim1"), 1, 1, 2, 1L, 3L, 1L, 0.2, 0.0F, idOneSketch)
+    ).map(wrapSeqToInternalRow(_, schema))
+
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.vectorizeKey}" -> "true"
+    )
+
+    readSpecifiedSegments(dsoMap, true, expected, Option(Array[Filter](LessThan("dim2", 2))))
+  }
+
+  test("DruidDataSourceReader should correctly read directly specified segments with " +
+    "useSparkConfForDeepStorage = true") {
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString,
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.useSparkConfForDeepStorageKey}" -> "true"
+    )
+
+    readSpecifiedSegments(dsoMap, false)
+  }
+
+  test("DruidDataSourceReader should correctly report which filters it does not support") {
+    val dsoMap = Map(
+      s"${DruidConfigurationKeys.readerPrefix}.${DruidConfigurationKeys.segmentsKey}" -> segmentsString
+    )
+
+    // DruidDataSourceReader doesn't support pushing down filters on complex types
+    val filters = Array[Filter](GreaterThanOrEqual("count", 5), LessThan("uniq_id1", 4))
+    val reader = DruidDataSourceReader(schema, new DataSourceOptions(dsoMap.asJava))
+    val result = reader.pushFilters(filters)
+    Array[Filter](LessThan("uniq_id1", 4)) should equal(result)
+    Array[Filter](GreaterThanOrEqual("count", 5)) should equal(reader.pushedFilters())
+  }
+
+  def readSpecifiedSegments(
+                             optionsMap: Map[String, String],
+                             useVectorizedReads: Boolean,
+                             expected: Seq[InternalRow] = defaultExpected,
+                             filterOpt: Option[Array[Filter]] = None
+                           ): Unit = {
+    val dso = new DataSourceOptions(optionsMap.asJava)
+    val reader = DruidDataSourceReader(schema, dso)
+    if (filterOpt.isDefined) {
+      reader.pushFilters(filterOpt.get)
+    }
+    useVectorizedReads should equal(reader.enableBatchRead())
+    val actual = if (useVectorizedReads) {
+      reader.planBatchInputPartitions().asScala
+        .flatMap(r => columnarPartitionReaderToSeq(r.createPartitionReader()))
+    } else {
+      reader.planInputPartitions().asScala
+        .flatMap(r => partitionReaderToSeq(r.createPartitionReader()))
+    }
+
+    actual should equal(expected)
+  }
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReaderSuite.scala b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReaderSuite.scala
new file mode 100644
index 0000000..f962048
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReaderSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.spark.SparkFunSuite
+import org.apache.druid.spark.configuration.SerializableHadoopConfiguration
+import org.apache.spark.sql.catalyst.InternalRow
+
+class DruidInputPartitionReaderSuite extends SparkFunSuite with InputPartitionReaderBehaviors[InternalRow] {
+
+  private val hadoopConf =
+    () => sparkContext.broadcast(new SerializableHadoopConfiguration(sparkContext.hadoopConfiguration))
+  private val sutName = "DruidInputPartitionReader"
+
+  // Run InputPartitionReader tests for DruidInputPartitionReader
+  testsFor(inputPartitionReader(
+    sutName,
+    hadoopConf,
+    new DruidInputPartitionReader(_, _, _, _, _, _, _, _, _),
+    partitionReaderToSeq
+  ))
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionSuite.scala b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionSuite.scala
new file mode 100644
index 0000000..230efb4
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.spark.SparkFunSuite
+import org.apache.druid.spark.configuration.Configuration
+import org.apache.druid.spark.v2.DruidDataSourceV2TestUtils
+import org.scalatest.matchers.should.Matchers
+
+
+class DruidInputPartitionSuite extends SparkFunSuite with Matchers with DruidDataSourceV2TestUtils {
+  test("DruidInputPartition should correctly serialize tasks") {
+    val reader = new DruidInputPartition(
+      firstSegment, schema, None, None, Configuration.EMPTY_CONF, false, false, false
+    ).createPartitionReader()
+    reader.next() shouldBe true
+    reader.close()
+  }
+
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/v2/reader/InputPartitionReaderBehaviors.scala b/spark/src/test/scala/org/apache/druid/spark/v2/reader/InputPartitionReaderBehaviors.scala
new file mode 100644
index 0000000..9933d06
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/v2/reader/InputPartitionReaderBehaviors.scala
@@ -0,0 +1,194 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.v2.reader

+

+import org.apache.druid.query.filter.DimFilter

+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}

+import org.apache.druid.spark.utils.FilterUtils

+import org.apache.druid.spark.v2.DruidDataSourceV2TestUtils

+import org.apache.spark.broadcast.Broadcast

+import org.apache.spark.sql.catalyst.InternalRow

+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader

+import org.apache.spark.sql.sources.{Filter, GreaterThanOrEqual, LessThan}

+import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+trait InputPartitionReaderBehaviors[T] extends DruidDataSourceV2TestUtils with Matchers { self: AnyFunSuite =>

+  def inputPartitionReader( // scalastyle:ignore method.length (Need to wrap the tests in a method to call as one)

+                            name: String,

+                            hadoopConf: () => Broadcast[SerializableHadoopConfiguration],

+                            inputPartitionReaderConstructor: (

+                                String,

+                                StructType,

+                                Option[DimFilter],

+                                Option[Set[String]],

+                                Broadcast[SerializableHadoopConfiguration],

+                                Configuration,

+                                Boolean,

+                                Boolean,

+                                Boolean

+                              ) => InputPartitionReader[T],

+                            internalRowFetcher: InputPartitionReader[T] => Seq[InternalRow]): Unit = {

+    test(s"$name should read the specified segment") {

+      val expected = Seq(

+        Seq(1577836800000L, List("dim1"), 1, 1, 2, 1L, 1L, 3L, 4.2, 1.7F, idOneSketch),

+        Seq(1577862000000L, List("dim2"), 1, 1, 2, 1L, 4L, 2L, 5.1, 8.9F, idOneSketch)

+      ).map(wrapSeqToInternalRow(_, schema))

+      val partitionReader =

+        inputPartitionReaderConstructor(

+          firstSegmentString,

+          schema,

+          None,

+          columnTypes,

+          hadoopConf(),

+          Configuration.EMPTY_CONF,

+          false,

+          false,

+          false

+        )

+

+      val actual = internalRowFetcher(partitionReader)

+      actual should equal(expected)

+    }

+

+    test(s"$name should apply filters to string columns") {

+      val expected = Seq(

+        Seq(1577851200000L, List("dim1"), 1, 1, 2, 1L, 3L, 1L, 0.2, 0.0F, idOneSketch)

+      ).map(wrapSeqToInternalRow(_, schema))

+      val filter = FilterUtils.mapFilters(Array[Filter](LessThan("dim2", 2)), schema)

+      val partitionReader =

+        inputPartitionReaderConstructor(

+          secondSegmentString,

+          schema,

+          filter,

+          columnTypes,

+          hadoopConf(),

+          Configuration.EMPTY_CONF,

+          false,

+          false,

+          false

+        )

+

+      val actual = internalRowFetcher(partitionReader)

+      actual should equal(expected)

+    }

+

+    test(s"$name should apply filters to numeric columns") {

+      val expected = Seq(

+        Seq(1577876400000L, List("dim2"), 2, 1, 2, 1L, 1L, 5L, 8.0, 4.15F, idOneSketch)

+      ).map(wrapSeqToInternalRow(_, schema))

+      val filter = FilterUtils.mapFilters(Array[Filter](GreaterThanOrEqual("sum_metric4", 2L)), schema)

+      val partitionReader =

+        inputPartitionReaderConstructor(

+          secondSegmentString,

+          schema,

+          filter,

+          columnTypes,

+          hadoopConf(),

+          Configuration.EMPTY_CONF,

+          false,

+          false,

+          false

+        )

+

+      val actual = internalRowFetcher(partitionReader)

+      actual should equal(expected)

+    }

+

+    test(s"$name should handle multi-valued dimensions") {

+      val expected = Seq(

+        Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch),

+        Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch)

+      ).map(wrapSeqToInternalRow(_, schema))

+      val partitionReader =

+        inputPartitionReaderConstructor(

+          thirdSegmentString,

+          schema,

+          None,

+          columnTypes,

+          hadoopConf(),

+          Configuration.EMPTY_CONF,

+          false,

+          false,

+          false

+        )

+

+      val actual = internalRowFetcher(partitionReader)

+

+      actual should equal(expected)

+    }

+

+    test(s"$name should handle missing columns using default values for nulls") {

+      val extendedSchema =

+        new StructType(schema.fields ++ Seq(StructField("newCol", LongType), StructField("newStringCol", StringType)))

+

+      val expected = Seq(

+        Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch, 0L, ""),

+        Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch, 0L, "")

+      ).map(wrapSeqToInternalRow(_, extendedSchema))

+

+      val partitionReader =

+        inputPartitionReaderConstructor(

+          thirdSegmentString,

+          extendedSchema,

+          None,

+          columnTypes,

+          hadoopConf(),

+          Configuration.EMPTY_CONF,

+          false,

+          false,

+          true

+        )

+

+      val actual = internalRowFetcher(partitionReader)

+

+      actual should equal(expected)

+    }

+

+    test(s"$name should handle missing columns using SQL-compatible null handling") {

+      val extendedSchema = new StructType(schema.fields :+ StructField("newCol", LongType))

+

+      // scalastyle:off null

+      val expected = Seq(

+        Seq(1577962800000L, List("dim1", "dim3"), 2, 3, 7, 1L, 2L, 4L, 11.17, 3.7F, idThreeSketch, null),

+        Seq(1577988000000L, List("dim2"), 3, 2, 1, 1L, 1L, 7L, 0.0, 19.0F, idTwoSketch, null)

+      ).map(wrapSeqToInternalRow(_, extendedSchema))

+      // scalastyle:on null

+

+      val partitionReader =

+        inputPartitionReaderConstructor(

+          thirdSegmentString,

+          extendedSchema,

+          None,

+          columnTypes,

+          hadoopConf(),

+          Configuration.EMPTY_CONF,

+          false,

+          false,

+          false

+        )

+

+      val actual = internalRowFetcher(partitionReader)

+

+      actual should equal(expected)

+    }

+  }

+}

diff --git a/website/.spelling b/website/.spelling
index a0a1025..ba9e03b 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1893,3 +1893,36 @@
 protobuf
 Golang
 multiValueHandling
+ - ../docs/operations/spark.md
+SaveMode
+SaveModes
+partitioners
+dataframe
+dataframes
+DataFrames
+DataFrameWriter
+DataSourceReader
+DataSourceWriter
+DynamicConfigProvider
+DynamicConfigProviders
+DynamicConfigProviderRegistry
+serializable
+serde
+deserializable
+deserializing
+Gradle
+DimensionSchema
+DimensionSchemas
+DataSegment
+LoadSpecs
+ComplexTypeRegistry
+SegmentWriterRegistry
+SegmentReaderRegistry
+SQLConnectorRegistry
+IllegalArgumentException
+keyId
+base-64
+DataSourceV2
+_or_
+ADCs
+compileOnly