Set up, confs, mixins, and clients. (#11474)

* Set up, confs, mixins, and clients.

Initial module set up (e.g. pom, checkstyle, exclusions, licenses, etc.), support classes (mix-ins traits for logging and the try-with-resources pattern & configuration helpers), and an HTTP client for talking to a Druid cluster.
diff --git a/.travis.yml b/.travis.yml
index 4211f18..4f8e68a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -74,7 +74,7 @@
       script: ${MVN} animal-sniffer:check --fail-at-end
 
     - name: "checkstyle"
-      script: ${MVN} checkstyle:checkstyle --fail-at-end
+      script: ${MVN} checkstyle:checkstyle --fail-at-end -pl '!spark' && ${MVN} scalastyle:check --fail-at-end -pl 'spark'
 
     - name: "enforcer checks"
       script: ${MVN} enforcer:enforce --fail-at-end
diff --git a/LICENSE b/LICENSE
index aa65ab9..7318c8d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -279,6 +279,13 @@
     This product contains lpad and rpad methods adapted from Apache Flink.
       * core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
 
+    This product contains Scala logging and serializable Hadoop configuration utilities adapted from Apache Spark.
+      * spark/src/main/scala/org/apache/druid/spark/mixins/Logging
+
+    This product contains a Druid client wrapper adapted from Imply Data's druid-hadoop-input-format.
+      * spark/src/main/scala/org/apache/druid/spark/clients/DruidClient
+      * spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder
+
 
 MIT License
 ================================
diff --git a/NOTICE b/NOTICE
index 7761a54..64e3a2b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -102,3 +102,35 @@
 
 
 
+
+================= Apache Spark 2.4.7 =================
+
+Apache Spark
+Copyright 2014 and onwards The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Export Control Notice
+---------------------
+
+This distribution includes cryptographic software. The country in which you currently reside may have
+restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
+BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
+the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
+<http://www.wassenaar.org/> for more information.
+
+The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
+software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
+using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
+Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
+Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
+both object code and source code.
+
+The following provides more details on the included cryptographic software:
+
+This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
+support authentication, and encryption and decryption of data sent across the network between
+services.
+
diff --git a/codestyle/scalastyle_config.xml b/codestyle/scalastyle_config.xml
new file mode 100644
index 0000000..9e2ed4d
--- /dev/null
+++ b/codestyle/scalastyle_config.xml
@@ -0,0 +1,139 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<scalastyle commentFilter="enabled">
+ <name>Apache Druid Scalastyle configuration</name>
+ <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
+  <parameters>
+   <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+  <parameters>
+   <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+  <parameters>
+   <parameter name="maxLineLength"><![CDATA[120]]></parameter>
+   <parameter name="tabSize"><![CDATA[4]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+  <parameters>
+   <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+  <parameters>
+   <parameter name="maxParameters"><![CDATA[8]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
+  <parameters>
+   <parameter name="maxTypes"><![CDATA[30]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+  <parameters>
+   <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+   <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
+  <parameters>
+   <parameter name="maxLength"><![CDATA[50]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
+  <parameters>
+   <parameter name="maxMethods"><![CDATA[30]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.RedundantIfChecker" enabled="true"></check>
+ <check customId="println" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[println]]></parameter>
+  </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.EmptyClassChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[A-Z_]$]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.UnderscoreImportChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.LowercasePatternMatchChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ImportGroupingChecker" enabled="true"></check>
+</scalastyle>
\ No newline at end of file
diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index cb6cb9c..f70c128 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -44,6 +44,10 @@
             <Class name="org.apache.druid.server.AsyncQueryForwardingServlet" />
         </And>
     </Match>
+    <!-- Spot Bugs doesn't work for Scala -->
+    <Match>
+        <Package name="~org\.apache\.druid\.spark.*"/>
+    </Match>
 
     <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
     <Bug pattern="BC_UNCONFIRMED_CAST"/>
diff --git a/hooks/pre-push.sh b/hooks/pre-push.sh
index a0928db..31b40ba 100755
--- a/hooks/pre-push.sh
+++ b/hooks/pre-push.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-mvn checkstyle:checkstyle --fail-at-end
+mvn checkstyle:checkstyle --fail-at-end  -pl '!spark' && mvn scalastyle:check --fail-at-end -pl spark
diff --git a/pom.xml b/pom.xml
index eb066a4..9786eac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
         <protobuf.version>3.11.0</protobuf.version>
         <resilience4j.version>1.3.1</resilience4j.version>
         <slf4j.version>1.7.12</slf4j.version>
+        <surefire.version>2.22.2</surefire.version>
         <!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
         <hadoop.compile.version>2.8.5</hadoop.compile.version>
         <mockito.version>3.8.0</mockito.version>
@@ -142,6 +143,8 @@
         <!-- Core cloud functionality -->
         <module>cloud/aws-common</module>
         <module>cloud/gcp-common</module>
+        <!-- Spark Connectors -->
+        <module>spark</module>
         <!-- Core extensions -->
         <module>extensions-core/kubernetes-extensions</module>
         <module>extensions-core/avro-extensions</module>
@@ -1517,7 +1520,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.22.2</version>
+                    <version>${surefire.version}</version>
                     <configuration>
                         <!-- locale settings must be set on the command line before startup -->
                         <!-- set default options -->
@@ -1777,6 +1780,8 @@
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-surefire-plugin</artifactId>
+                        <!-- Explicitly setting version here so that intelliJ uses the right schema for inspections -->
+                        <version>${surefire.version}</version>
                         <executions>
                             <execution>
                                 <phase>test</phase>
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..6dcae79
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,444 @@
+<?xml version="1.0" encoding="UTF-8"?>

+<!--

+  ~ Licensed to the Apache Software Foundation (ASF) under one

+  ~ or more contributor license agreements.  See the NOTICE file

+  ~ distributed with this work for additional information

+  ~ regarding copyright ownership.  The ASF licenses this file

+  ~ to you under the Apache License, Version 2.0 (the

+  ~ "License"); you may not use this file except in compliance

+  ~ with the License.  You may obtain a copy of the License at

+  ~

+  ~   http://www.apache.org/licenses/LICENSE-2.0

+  ~

+  ~ Unless required by applicable law or agreed to in writing,

+  ~ software distributed under the License is distributed on an

+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+  ~ KIND, either express or implied.  See the License for the

+  ~ specific language governing permissions and limitations

+  ~ under the License.

+  -->

+

+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

+    <modelVersion>4.0.0</modelVersion>

+

+    <artifactId>druid-spark</artifactId>

+    <name>druid-spark</name>

+    <description>Spark connectors for reading data from and writing data to Druid clusters</description>

+

+    <parent>

+        <groupId>org.apache.druid</groupId>

+        <artifactId>druid</artifactId>

+        <version>0.22.0-SNAPSHOT</version>

+        <relativePath>../pom.xml</relativePath>

+    </parent>

+

+    <properties>

+        <scala.version>${scala.major.version}.12</scala.version>

+        <scala.major.version>2.12</scala.major.version>

+        <spark.version>2.4.8</spark.version>

+        <!-- These two properties allow -Dcheckstyle.skip to suppress scalastyle checks as well -->

+        <checkstyle.skip>false</checkstyle.skip>

+        <scalastyle.skip>${checkstyle.skip}</scalastyle.skip>

+    </properties>

+

+    <dependencyManagement>

+        <dependencies>

+            <dependency>

+                <groupId>io.dropwizard.metrics</groupId>

+                <artifactId>metrics-core</artifactId>

+                <version>4.0.2</version>

+            </dependency>

+        </dependencies>

+    </dependencyManagement>

+

+    <dependencies>

+        <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-core</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>com.jayway.jsonpath</groupId>

+                    <artifactId>json-path</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>com.opencsv</groupId>

+                    <artifactId>opencsv</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>io.airlift</groupId>

+                    <artifactId>airline</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>io.netty</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>io.timeandspace</groupId>

+                    <artifactId>cron-scheduler</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>net.thisptr</groupId>

+                    <artifactId>jackson-jq</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.apache.druid</groupId>

+                    <artifactId>druid-console</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.gridkit.lab</groupId>

+                    <artifactId>jvm-attach-api</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.hyperic</groupId>

+                    <artifactId>sigar</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.jboss.logging</groupId>

+                    <artifactId>jboss-logging</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.mozilla</groupId>

+                    <artifactId>rhino</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.skife.config</groupId>

+                    <artifactId>config-magic</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.slf4j</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-processing</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>com.ibm.icu</groupId>

+                    <artifactId>icu4j</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>io.netty</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.apache.maven</groupId>

+                    <artifactId>maven-artifact</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.mozilla</groupId>

+                    <artifactId>rhino</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.skife.config</groupId>

+                    <artifactId>config-magic</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+

+

+        <dependency>

+            <groupId>commons-io</groupId>

+            <artifactId>commons-io</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.google.code.findbugs</groupId>

+            <artifactId>jsr305</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.google.guava</groupId>

+            <artifactId>guava</artifactId>

+        </dependency>

+        <dependency>

+            <groupId>com.fasterxml.jackson.core</groupId>

+            <artifactId>jackson-annotations</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.fasterxml.jackson.core</groupId>

+            <artifactId>jackson-databind</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.fasterxml.jackson.core</groupId>

+            <artifactId>jackson-core</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>com.google.http-client</groupId>

+            <artifactId>google-http-client</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>commons-lang</groupId>

+            <artifactId>commons-lang</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>javax.validation</groupId>

+            <artifactId>validation-api</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>joda-time</groupId>

+            <artifactId>joda-time</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>io.netty</groupId>

+            <artifactId>netty</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.commons</groupId>

+            <artifactId>commons-dbcp2</artifactId>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.hadoop</groupId>

+            <artifactId>hadoop-common</artifactId>

+            <exclusions>

+                <exclusion>

+                    <groupId>log4j</groupId>

+                    <artifactId>log4j</artifactId>

+                </exclusion>

+            </exclusions>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.slf4j</groupId>

+            <artifactId>slf4j-api</artifactId>

+            <scope>provided</scope>

+        </dependency>

+

+        <!-- Spark -->

+        <dependency>

+            <groupId>org.apache.spark</groupId>

+            <artifactId>spark-core_${scala.major.version}</artifactId>

+            <version>${spark.version}</version>

+            <scope>provided</scope>

+            <exclusions>

+                <exclusion>

+                    <groupId>com.fasterxml.jackson</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>com.fasterxml.jackson.core</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.slf4j</groupId>

+                    <artifactId>slf4j-api</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>log4j</groupId>

+                    <artifactId>log4j</artifactId>

+                </exclusion>

+            </exclusions>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.spark</groupId>

+            <artifactId>spark-sql_${scala.major.version}</artifactId>

+            <version>${spark.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.spark</groupId>

+            <artifactId>spark-unsafe_${scala.major.version}</artifactId>

+            <version>${spark.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.apache.spark</groupId>

+            <artifactId>spark-catalyst_${scala.major.version}</artifactId>

+            <version>${spark.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.scala-lang</groupId>

+            <artifactId>scala-library</artifactId>

+            <version>${scala.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.scala-lang</groupId>

+            <artifactId>scala-reflect</artifactId>

+            <version>${scala.version}</version>

+            <scope>provided</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.scala-lang</groupId>

+            <artifactId>scalap</artifactId>

+            <version>${scala.version}</version>

+            <scope>provided</scope>

+        </dependency>

+

+        <!-- Tests -->

+        <dependency>

+            <groupId>org.apache.druid</groupId>

+            <artifactId>druid-core</artifactId>

+            <version>${project.parent.version}</version>

+            <exclusions>

+                <exclusion>

+                    <groupId>org.apache.logging.log4j</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.hyperic</groupId>

+                    <artifactId>sigar</artifactId>

+                </exclusion>

+                <exclusion>

+                    <groupId>org.slf4j</groupId>

+                    <artifactId>*</artifactId>

+                </exclusion>

+            </exclusions>

+            <type>test-jar</type>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.scalatest</groupId>

+            <artifactId>scalatest_${scala.major.version}</artifactId>

+            <version>3.1.1</version>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.scalactic</groupId>

+            <artifactId>scalactic_${scala.major.version}</artifactId>

+            <version>3.1.1</version>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

+            <groupId>junit</groupId>

+            <artifactId>junit</artifactId>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.easymock</groupId>

+            <artifactId>easymock</artifactId>

+            <scope>test</scope>

+        </dependency>

+        <dependency>

+            <groupId>org.hamcrest</groupId>

+            <artifactId>hamcrest-core</artifactId>

+            <scope>test</scope>

+        </dependency>

+    </dependencies>

+

+    <build>

+        <plugins>

+            <!-- scala build -->

+            <plugin>

+                <groupId>net.alchim31.maven</groupId>

+                <artifactId>scala-maven-plugin</artifactId>

+                <version>3.2.2</version>

+                <executions>

+                    <execution>

+                        <id>eclipse-add-source</id>

+                        <goals>

+                            <goal>add-source</goal>

+                        </goals>

+                    </execution>

+                    <execution>

+                        <id>scala-compile-first</id>

+                        <phase>process-resources</phase>

+                        <goals>

+                            <goal>compile</goal>

+                        </goals>

+                    </execution>

+                    <execution>

+                        <id>scala-test-compile-first</id>

+                        <phase>process-test-resources</phase>

+                        <goals>

+                            <goal>testCompile</goal>

+                        </goals>

+                    </execution>

+                    <execution>

+                        <id>attach-scaladocs</id>

+                        <phase>verify</phase>

+                        <goals>

+                            <goal>doc-jar</goal>

+                        </goals>

+                    </execution>

+                </executions>

+                <configuration>

+                    <scalaVersion>${scala.version}</scalaVersion>

+                    <args>

+                        <arg>-unchecked</arg>

+                        <arg>-deprecation</arg>

+                        <arg>-feature</arg>

+                    </args>

+                    <jvmArgs>

+                        <jvmArg>-Xms1024m</jvmArg>

+                        <jvmArg>-Xmx1024m</jvmArg>

+                    </jvmArgs>

+                    <javacArgs>

+                        <javacArg>-source</javacArg>

+                        <javacArg>${java.version}</javacArg>

+                        <javacArg>-target</javacArg>

+                        <javacArg>${java.version}</javacArg>

+                        <javacArg>-Xlint:all,-serial,-path</javacArg>

+                    </javacArgs>

+                </configuration>

+            </plugin>

+            <!-- disable surefire -->

+            <plugin>

+                <groupId>org.apache.maven.plugins</groupId>

+                <artifactId>maven-surefire-plugin</artifactId>

+                <configuration>

+                    <skipTests>true</skipTests>

+                </configuration>

+            </plugin>

+            <!-- enable scalatest -->

+            <plugin>

+                <groupId>org.scalatest</groupId>

+                <artifactId>scalatest-maven-plugin</artifactId>

+                <version>1.0</version>

+                <configuration>

+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>

+                    <junitxml>.</junitxml>

+                    <parallel>false</parallel>

+                    <argLine>-Dderby.stream.error.field=org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM\

+                        -Dlog4j.configuration=${project.basedir}/test/resources/log4j2.xml

+                    </argLine>

+                </configuration>

+                <executions>

+                    <execution>

+                        <id>test</id>

+                        <goals>

+                            <goal>test</goal>

+                        </goals>

+                    </execution>

+                </executions>

+            </plugin>

+            <plugin>

+                <groupId>org.scalastyle</groupId>

+                <artifactId>scalastyle-maven-plugin</artifactId>

+                <version>1.0.0</version>

+                <configuration>

+                    <verbose>false</verbose>

+                    <failOnViolation>true</failOnViolation>

+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>

+                    <failOnWarning>false</failOnWarning>

+                    <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>

+                    <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>

+                    <configLocation>${project.basedir}/../codestyle/scalastyle_config.xml</configLocation>

+                </configuration>

+                <executions>

+                    <execution>

+                        <id>validate</id>

+                        <phase>validate</phase>

+                        <goals>

+                            <goal>check</goal>

+                        </goals>

+                    </execution>

+                </executions>

+            </plugin>

+        </plugins>

+    </build>

+</project>

diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
new file mode 100644
index 0000000..577f87e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
@@ -0,0 +1,208 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.clients

+

+import com.fasterxml.jackson.core.`type`.TypeReference

+import com.google.common.net.HostAndPort

+import org.apache.druid.java.util.common.{IAE, ISE, Intervals, JodaUtils, StringUtils}

+import org.apache.druid.java.util.http.client.response.{StringFullResponseHandler,

+  StringFullResponseHolder}

+import org.apache.druid.java.util.http.client.{HttpClient, Request}

+import org.apache.druid.query.Druids

+import org.apache.druid.query.metadata.metadata.{ColumnAnalysis, SegmentAnalysis,

+  SegmentMetadataQuery}

+import org.apache.druid.spark.MAPPER

+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}

+import org.apache.druid.spark.mixins.Logging

+import org.jboss.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}

+import org.joda.time.{Duration, Interval}

+

+import java.net.URL

+import java.util.{List => JList}

+import javax.ws.rs.core.MediaType

+import scala.annotation.tailrec

+import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter,

+  mapAsScalaMapConverter, seqAsJavaListConverter}

+

+/**

+  * Going with SegmentMetadataQueries despite the significant overhead because there's no other way

+  * to get accurate information about Druid columns.

+  *

+  * This is pulled pretty directly from the druid-hadoop-input-format project on GitHub. There is

+  * likely substantial room for improvement.

+  */

+class DruidClient(

+                   httpClient: HttpClient,

+                   hostAndPort: HostAndPort,

+                   numRetries: Int,

+                   retryWaitSeconds: Int,

+                   timeoutWaitMilliseconds: Int

+                 ) extends Logging {

+  private val druidBaseQueryURL: HostAndPort => String =

+    (hostAndPort: HostAndPort) => s"http://$hostAndPort/druid/v2/"

+

+  private val DefaultSegmentMetadataInterval = List[Interval](Intervals.utc(

+    JodaUtils.MIN_INSTANT,

+    JodaUtils.MAX_INSTANT

+  ))

+

+  /**

+    * The SQL system catalog tables are incorrect for multivalue columns and don't have accurate

+    * type info, so we need to fall back to segmentMetadataQueries. Given a DATASOURCE and a range

+    * of INTERVALS to query over, return a map from column name to a tuple of

+    * (columnType, hasMultipleValues). Note that this is a very expensive operation over large

+    * numbers of segments. If possible, this method should only be called with the most granular

+    * starting and ending intervals instead of over a larger interval.

+    *

+    * @param dataSource The Druid dataSource to fetch the schema for.

+    * @param intervals  The intervals to return the schema for, or None to query all segments.

+    * @return A map from column name to data type and whether or not the column is multi-value

+    *         for the schema of DATASOURCE.

+    */

+  def getSchema(dataSource: String, intervals: Option[List[Interval]]): Map[String, (String, Boolean)] = {

+    val queryInterval = intervals.getOrElse(DefaultSegmentMetadataInterval)

+    val body = Druids.newSegmentMetadataQueryBuilder()

+      .dataSource(dataSource)

+      .intervals(queryInterval.asJava)

+      .analysisTypes(SegmentMetadataQuery.AnalysisType.SIZE)

+      .merge(true)

+      .context(Map[String, AnyRef](

+        "timeout" -> Int.box(timeoutWaitMilliseconds)

+      ).asJava)

+      .build()

+    val response = sendRequestWithRetry(

+      druidBaseQueryURL(hostAndPort), numRetries, Option(MAPPER.writeValueAsBytes(body))

+    )

+    val segments =

+      MAPPER.readValue[JList[SegmentAnalysis]](

+        response.getContent, new TypeReference[JList[SegmentAnalysis]]() {})

+    if (segments.size() == 0) {

+      throw new IAE(

+        s"No segments found for intervals [${intervals.mkString(",")}] on $dataSource"

+      )

+    }

+    // Since we're setting merge to true, there should only be one item in the list

+    if (segments.size() > 1) {

+      throw new ISE("Merged segment metadata response had more than one row!")

+    }

+    log.debug(segments.asScala.map(_.toString).mkString("SegmentAnalysis: [", ", ", "]"))

+    /*

+     * If a dimension has multiple types within the spanned interval, the resulting column

+     * analysis will have the type "STRING" and be an error message. We abuse that here to infer

+     * a string type for the dimension and widen the type for the resulting DataFrame.

+     */

+    val columns = segments.asScala.head.getColumns.asScala.toMap

+    columns.foreach{ case(key, column) =>

+      if (column.isError) {

+        log.warn(s"Multiple column types found for dimension $key in interval" +

+          s" ${queryInterval.mkString("[", ", ", "]")}! Falling back to STRING type")

+      }

+    }

+    columns.map{ case (name: String, col: ColumnAnalysis) =>

+      name -> (col.getType, col.isHasMultipleValues)

+    }

+  }

+

+  /*

+   * Marking this method as tail recursive because it is for now. If a finally block,

+   * special error handling, or more involved set up and tear down code is added, this

+   * method may no longer be tail recursive and so compilation will fail. Because the

+   * number of retries is user-configurable and will likely be relatively small,

+   * latency in communication with a Druid cluster for Segment Metadata will be dominated

+   * by the query time, and Scala will optimize tail recursive calls regardless of annotation,

+   * future developers shouldn't be concerned if they need to remove this annotation.

+   */

+  @tailrec

+  private def sendRequestWithRetry(

+                                    url: String,

+                                    retryCount: Int,

+                                    content: Option[Array[Byte]] = None

+                                  ): StringFullResponseHolder = {

+    try {

+      sendRequest(url, content)

+    } catch {

+      case e: Exception =>

+        if (retryCount > 0) {

+          logInfo(s"Got exception: ${e.getMessage}, retrying ...")

+          Thread.sleep(retryWaitSeconds * 1000)

+          sendRequestWithRetry(url, retryCount - 1, content)

+        } else {

+          throw e

+        }

+    }

+  }

+

+  private def sendRequest(url: String, content: Option[Array[Byte]]): StringFullResponseHolder = {

+    try {

+      val request = buildRequest(url, content)

+      var response = httpClient.go(

+        request,

+        new StringFullResponseHandler(StringUtils.UTF8_CHARSET),

+        Duration.millis(timeoutWaitMilliseconds)

+      ).get

+      if (response.getStatus == HttpResponseStatus.TEMPORARY_REDIRECT) {

+        val newUrl = response.getResponse.headers().get("Location")

+        logInfo(s"Got a redirect, new location: $newUrl")

+        response = httpClient.go(

+          buildRequest(newUrl, content), new StringFullResponseHandler(StringUtils.UTF8_CHARSET)

+        ).get

+      }

+      if (!(response.getStatus == HttpResponseStatus.OK)) {

+        throw new ISE(

+          s"Error getting response for %s, status[%s] content[%s]",

+          url,

+          response.getStatus,

+          response.getContent

+        )

+      }

+      response

+    } catch {

+      case e: Exception =>

+        throw e

+    }

+  }

+

+  def buildRequest(url: String, content: Option[Array[Byte]]): Request = {

+    content.map(

+      new Request(HttpMethod.POST, new URL(url))

+        .setHeader("Content-Type", MediaType.APPLICATION_JSON)

+        .setContent(_)

+    ).getOrElse(

+      new Request(HttpMethod.GET, new URL(url))

+    )

+  }

+}

+

+object DruidClient {

+  // TODO: Add support for Kerberized etc. clients

+  def apply(conf: Configuration): DruidClient = {

+    val brokerConf = conf.dive(DruidConfigurationKeys.brokerPrefix)

+    new DruidClient(

+      HttpClientHolder.create.get,

+      HostAndPort.fromParts(

+        brokerConf.get(DruidConfigurationKeys.brokerHostDefaultKey),

+        brokerConf.getInt(DruidConfigurationKeys.brokerPortDefaultKey)),

+      brokerConf.getInt(DruidConfigurationKeys.numRetriesDefaultKey),

+      brokerConf.getInt(DruidConfigurationKeys.retryWaitSecondsDefaultKey),

+      brokerConf.getInt(DruidConfigurationKeys.timeoutMillisecondsDefaultKey)

+    )

+  }

+}

+

diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala
new file mode 100644
index 0000000..b13d2c8
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala
@@ -0,0 +1,51 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.clients

+

+import com.google.common.base.Throwables

+import org.apache.druid.java.util.common.lifecycle.Lifecycle

+import org.apache.druid.java.util.http.client.{HttpClient, HttpClientConfig, HttpClientInit}

+

+import java.io.{Closeable, IOException}

+

+object HttpClientHolder {

+  def create: HttpClientHolder = {

+    val lifecycle = new Lifecycle

+    val httpClient = HttpClientInit.createClient(HttpClientConfig.builder.build, lifecycle)

+    try {

+      lifecycle.start()

+    } catch {

+      case e: Exception =>

+        throw Throwables.propagate(e)

+    }

+    new HttpClientHolder(lifecycle, httpClient)

+  }

+}

+

+class HttpClientHolder(val lifecycle: Lifecycle, val client: HttpClient) extends Closeable {

+  def get: HttpClient = {

+    client

+  }

+

+  @throws[IOException]

+  override def close(): Unit = {

+    lifecycle.stop()

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
new file mode 100644
index 0000000..71f31de
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
@@ -0,0 +1,209 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.configuration

+

+import org.apache.druid.java.util.common.StringUtils

+import org.apache.spark.sql.sources.v2.DataSourceOptions

+

+import scala.collection.JavaConverters.mapAsScalaMapConverter

+

+/**

+  * A simple wrapper around a properties map that can also "dive" into a sub-Configuration if keys are stored using

+  * dotted separators. Because this class is designed to make working with Spark DataSourceOptions easier and DSOs are

+  * case-insensitive, this class is also case-insensitive.

+  *

+  * @param properties A Map of String property names to String property values to back this Configuration.

+  */

+class Configuration(properties: Map[String, String]) extends Serializable {

+  def getAs[T](key: String): T = {

+    properties(StringUtils.toLowerCase(key)).asInstanceOf[T]

+  }

+

+  def get(key: String): Option[String] = {

+    properties.get(StringUtils.toLowerCase(key))

+  }

+

+  def get(keyWithDefault: (String, String)): String = {

+    this.get(StringUtils.toLowerCase(keyWithDefault._1)).getOrElse(keyWithDefault._2)

+  }

+

+  def getString(key: String): String = {

+    properties.getOrElse(StringUtils.toLowerCase(key), "")

+  }

+

+  def getInt(key: String, default: Int): Int = {

+    properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toInt)

+  }

+

+  def getInt(keyWithDefault: (String, Int)): Int = {

+    this.getInt(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)

+  }

+

+  def getLong(key: String, default: Long): Long = {

+    properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toLong)

+  }

+

+  def getLong(keyWithDefault: (String, Long)): Long = {

+    this.getLong(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)

+  }

+

+  def getBoolean(key: String, default: Boolean): Boolean = {

+    properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toBoolean)

+  }

+

+  def getBoolean(keyWithDefault: (String, Boolean)): Boolean = {

+    this.getBoolean(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)

+  }

+

+  def apply(key: String): String = {

+    this.get(key) match {

+      case Some(v) => v

+      case None => throw new NoSuchElementException(s"Key $key not found!")

+    }

+  }

+

+  def isPresent(key: String): Boolean = {

+    properties.isDefinedAt(StringUtils.toLowerCase(key))

+  }

+

+  def isPresent(paths: String*): Boolean = {

+    properties.isDefinedAt(StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR)))

+  }

+

+  /**

+    * Given a prefix PREFIX, return a Configuration object containing every key in this

+    * configuration that starts with PREFIX. Keys in the resulting Configuration will have PREFIX

+    * removed from their start.

+    *

+    * For example, if a Configuration contains the key `druid.broker.host` and `dive("druid")` is

+    * called on it, the resulting Configuration will contain the key `broker.host` with the same

+    * value as `druid.broker.host` had in the original Configuration.

+    *

+    * @param prefix The namespace to "dive" into.

+    * @return A Configuration containing the keys in this Configuration that start with PREFIX,

+    *         stripped of their leading PREFIX.

+    */

+  def dive(prefix: String): Configuration = {

+    new Configuration(properties

+      .filterKeys(_.startsWith(s"$prefix${Configuration.SEPARATOR}"))

+      .map{case (k, v) => k.substring(prefix.length + 1) -> v})

+  }

+

+  /**

+    * Given a number of prefixes, return a Configuration object containing every key in this

+    * configuration that starts with `prefixes.mkString(Configuration.SEPARATOR)`. Keys in the

+    * resulting Configuration will have the concatenation of PREFIXES removed from their start.

+    *

+    * Note that this is the equivalent of chaining `dive` calls, not chaining `merge` calls.

+    *

+    * @param prefixes The namespaces, in order, to "dive" into.

+    * @return A Configuration containing the keys that start with every prefix in PREFIXES joined

+    *         by periods, stripped of the leading prefixes matching the prefixes in PREFIXES.

+    */

+  def dive(prefixes: String*): Configuration = {

+    prefixes.foldLeft(this){case (conf, prefix) => conf.dive(prefix)}

+  }

+

+  /**

+    * Combine this configuration with another Configuration. If keys collide between these

+    * configurations, the corresponding values in OTHER will be selected.

+    *

+    * @param other A Configuration to merge with this Configuration.

+    * @return A Configuration containing the union of keys between this Configuration and OTHER.

+    *         If keys collide between the two Configurations, the values in OTHER will be kept.

+    */

+  def merge(other: Configuration): Configuration = {

+    new Configuration(this.properties ++ other.toMap)

+  }

+

+  /**

+    * Combine this configuration with another Configuration, moving the other Configuration to the provided name space.

+    * If keys collide between this configuration and the newly-namespaced OTHER, the corresponding values in OTHER will

+    * be selected.

+    *

+    * @param namespace The name space to merge OTHER under.

+    * @param other The Configuration to merge with this Configuration.

+    * @return A new Configuration object containing all the keys in this Configuration, plus the keys in OTHER

+    *         namespaced under NAMESPACE.

+    */

+  def merge(namespace: String, other: Configuration): Configuration = {

+    this.merge(Configuration(namespace, other.toMap))

+  }

+

+  /**

+    * Add the properties specified in PROPERTIES to this Configuration's properties, moving the new properties to the

+    * provided name space. If this Configuration already contains keys under the provided name space and those keys

+    * collide with the properties specified in PROPERTIES, the corresponding values in PROPERTIES will be selected.

+    *

+    * @param namespace The name space to merge the properties specified in PROPERTIES under.

+    * @param properties The map of properties to values to combine with the properties from this Configuration.

+    * @return A new Configuration object containing all the keys in this Configuration, plus the properties in

+    *         PROPERTIES namespaced under NAMESPACE.

+    */

+  def merge(namespace: String, properties: Map[String, String]): Configuration = {

+    this.merge(Configuration(namespace, properties))

+  }

+

+  override def equals(obj: Any): Boolean = {

+    obj.isInstanceOf[Configuration] && this.toMap == obj.asInstanceOf[Configuration].toMap

+  }

+

+  override def hashCode(): Int = {

+    this.properties.hashCode()

+  }

+

+  def toMap: Map[String, String] = {

+    this.properties

+  }

+

+  override def toString: String = {

+    this.toMap.mkString("Configuration{", "; ", "}")

+  }

+}

+

+object Configuration {

+  def apply(properties: Map[String, String]): Configuration = {

+    new Configuration(properties.map{case (k, v) => StringUtils.toLowerCase(k) -> v})

+  }

+

+  def apply(namespace: String, properties: Map[String, String]): Configuration = {

+    new Configuration(properties.map{case(k, v) => StringUtils.toLowerCase(s"$namespace$SEPARATOR$k") -> v})

+  }

+

+  def apply(dso: DataSourceOptions): Configuration = {

+    new Configuration(dso.asMap().asScala.toMap)

+  }

+

+  def fromKeyValue(key: String, value: String): Configuration = {

+    new Configuration(Map[String, String](StringUtils.toLowerCase(key) -> value))

+  }

+

+  /**

+    * Get the key corresponding to each element of PATHS interpreted as a namespace or property.

+    *

+    * @param paths The parent namespaces and property as individual strings to convert into a single configuration key.

+    * @return The path to a property through its parent namespaces as a single configuration key.

+    */

+  def toKey(paths: String*): String = {

+    StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR))

+  }

+

+  private val SEPARATOR = "."

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
new file mode 100644
index 0000000..cfa1d74
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
@@ -0,0 +1,35 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.configuration

+

+object DruidConfigurationKeys {

+  // Druid Client Configs

+  val brokerPrefix: String = "broker"

+  val brokerHostKey: String = "host"

+  val brokerPortKey: String = "port"

+  val numRetriesKey: String = "numRetries"

+  val retryWaitSecondsKey: String = "retryWaitSeconds"

+  val timeoutMillisecondsKey: String = "timeoutMilliseconds"

+  private[spark] val brokerHostDefaultKey: (String, String) = (brokerHostKey, "localhost")

+  private[spark] val brokerPortDefaultKey: (String, Int) = (brokerPortKey, 8082)

+  private[spark] val numRetriesDefaultKey: (String, Int) = (numRetriesKey, 5)

+  private[spark] val retryWaitSecondsDefaultKey: (String, Int) = (retryWaitSecondsKey, 5)

+  private[spark] val timeoutMillisecondsDefaultKey: (String, Int) = (timeoutMillisecondsKey, 300000)

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala
new file mode 100644
index 0000000..055c22b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala
@@ -0,0 +1,116 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.mixins

+

+import org.apache.druid.java.util.common.logger.Logger

+

+/**

+  * Simplified version of org.apache.spark.internal.Logging. Uses

+  * org.apache.druid.java.util.common.logger.Logger instead of slf4j's Logger directly.

+  */

+trait Logging {

+

+  @transient private var logger: Logger = _

+

+  private lazy val logName = this.getClass.getName.stripSuffix("$")

+

+  /**

+    * Return the configured underlying logger

+    *

+    * @return the configured underlying logger

+    */

+  protected def log: Logger = {

+    if (logger == null) {

+      logger= new Logger(logName)

+    }

+    logger

+  }

+

+  /**

+    * Log a message at the TRACE level.

+    *

+    * @param msg the message string to be logged

+    */

+  protected def logTrace(msg: => String): Unit = {

+    if (log.isTraceEnabled) {

+      log.trace(msg)

+    }

+  }

+

+  /**

+    * Log a message at the DEBUG level.

+    *

+    * @param msg the message string to be logged

+    */

+  protected def logDebug(msg: => String): Unit = {

+    if (log.isDebugEnabled) {

+      log.debug(msg)

+    }

+  }

+

+  /**

+    * Log a message at the INFO level.

+    *

+    * @param msg the message string to be logged

+    */

+  protected def logInfo(msg: => String): Unit = {

+    if (log.isInfoEnabled) {

+      log.info(msg)

+    }

+  }

+

+  /**

+    * Log a message at the WARN level.

+    *

+    * @param msg the message string to be logged

+    */

+  protected def logWarn(msg: => String): Unit = {

+    log.warn(msg)

+  }

+

+  /**

+    * Log a message with an exception at the WARN level.

+    *

+    * @param msg       the message string to be logged

+    * @param exception the exception to log in addition to the message

+    */

+  protected def logWarn(msg: => String, exception: Throwable): Unit = {

+    log.warn(exception, msg)

+  }

+

+  /**

+    * Log a message at the ERROR level.

+    *

+    * @param msg the message string to be logged

+    */

+  protected def logError(msg: => String): Unit = {

+    log.error(msg)

+  }

+

+  /**

+    * Log a message with an exception at the ERROR level.

+    *

+    * @param msg       the message string to be logged

+    * @param exception the exception to log in addition to the message

+    */

+  protected def logError(msg: => String, exception: Throwable): Unit = {

+      log.error(exception, msg)

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala
new file mode 100644
index 0000000..6b0ff24
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala
@@ -0,0 +1,329 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.mixins

+

+import scala.util.control.{ControlThrowable, NonFatal}

+

+/**

+  * Utility trait to ape the try-with-resources construct in Java. This is quick and dirty and has its flaws. If a more

+  * robust approach is needed, the better-files approach (https://github.com/pathikrit/better-files#lightweight-arm)

+  * should be considered.

+  *

+  * Scala doesn't support varargs of generic type, so using overloading to support managing multiple resources with a

+  * sequence. Ideally Scala would support macro programming to generate tryWithResources functions for

+  * tuples of [_ <: AutoCloseable] so users could un-tuple multiple resources as named variables without needing to

+  * manually generate separate methods for each length up to 22, but since it doesn't I've created methods for up to

+  * five resources at once. These methods can be nested or the method taking a Sequence of AutoCloseables can be used

+  * for arbitrary resources, with the caveat that named unpacking is not possible over Sequences. If there was a way to

+  * enforce type bounds on the elements of subclasses of Produce we could hack up a type that unioned AutoCloseable and

+  * Products whose elements are all subtypes of AutoCloseable, but since there isn't this is a quick fix. Note that

+  * shared code for the tuple methods is not refactored to a single private method because the common supertype of

+  * tuples is Product, with no type information.

+  */

+trait TryWithResources {

+  /**

+    * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:

+    * val resource = new ResourceImplementingAutoCloseable()

+    * tryWithResources(resource){r =>

+    *   r.doSomething()

+    * }

+    *

+    * or, if desired,

+    *

+    * tryWithResources(new ResourceImplementingAutoCloseable()){ resouce =>

+    *   resource.doSomething()

+    * }

+    *

+    * @param resource The AutoCloseable resource to use in FUNC.

+    * @param func     The function block to execute (think of this as the try block).

+    * @tparam T Any subtype of AutoCloseable.

+    * @tparam V The result type of FUNC.

+    * @return The result of executing FUNC with RESOURCE.

+    */

+  def tryWithResources[T <: AutoCloseable, V](resource: T)(func: T => V): V = {

+    try {

+      func(resource)

+    } catch {

+      // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions

+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>

+        try {

+          resource.close()

+        } catch {

+          case NonFatal(e2) =>

+            // e2 isn't fatal, let the original exception e take precedence

+            e.addSuppressed(e2)

+          case e2: Throwable =>

+            if (NonFatal(e)) {

+              // e2 is fatal but e isn't, suppress e

+              e2.addSuppressed(e)

+              throw e2

+            }

+            // Both exceptions are fatal, suppress the also-fatal e2 that occurred while closing

+            e.addSuppressed(e2)

+        }

+        throw e

+    } finally {

+      resource.close()

+    }

+  }

+

+  /**

+    * A helper function to duplicate Java's try-with-resources construction. Unfortunately Scala doesn't support varargs

+    * for generic types and I don't feel like writing 22 separate functions, so callers will have to keep track of the

+    * exact ordering of elements in the resource sequence or nest the provided tuple methods.

+    *

+    * To use, mix in this trait and then call like so:

+    * val fileResource = new ResourceImplementingAutoCloseable()

+    * val writerResource = new OtherAutoCloseable()

+    * tryWithResources(Seq(fileResource, writerResource)){resources =>

+    *   val file = resources(0)

+    *   val writer = resources(1)

+    *   writer.write(file, data)

+    * }

+    *

+    * @param resources A list of AutoCloseable resources to use in FUNC.

+    * @param func      The function block to execute (think of this as the try block).

+    * @tparam T Any subtype of AutoCloseable.

+    * @tparam V The result type of FUNC.

+    * @return The result of executing FUNC with RESOURCES.

+    */

+  def tryWithResources[T <: AutoCloseable, V](resources: Seq[T])(func: Seq[T] => V): V = {

+    try {

+      func(resources)

+    } catch {

+      // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions

+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>

+        throw closeAllResourcesMergingExceptions(resources, e)

+    } finally {

+      closeAllResourcesFinally(resources)

+    }

+  }

+

+  /**

+    * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:

+    * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable())

+    * tryWithResources(resources){

+    *   case (first, second) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    * }

+    *

+    * or, if desired,

+    *

+    * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable){

+    *   case (first, second) =>

+    *     first.doSomething()

+    *     second.doSomething

+    * }

+    *

+    * @param resources A tuple of AutoCloseable resources to use in FUNC.

+    * @param func      The function block to execute (think of this as the try block).

+    * @tparam T Any subtype of AutoCloseable.

+    * @tparam V The result type of FUNC.

+    * @return The result of executing FUNC with RESOURCES.

+    */

+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T))(func: ((T, T)) => V): V = {

+    val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])

+    try {

+      func(resources)

+    } catch {

+      // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions

+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>

+        throw closeAllResourcesMergingExceptions(closeableResources, e)

+    } finally {

+      closeAllResourcesFinally(closeableResources)

+    }

+  }

+

+  /**

+    * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:

+    * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...)

+    * tryWithResources(resources){

+    *   case (first, second, ...) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    *     ...

+    * }

+    *

+    * or, if desired,

+    *

+    * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){

+    *   case (first, second, ...) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    *     ...

+    * }

+    *

+    * @param resources A tuple of AutoCloseable resources to use in FUNC.

+    * @param func      The function block to execute (think of this as the try block).

+    * @tparam T Any subtype of AutoCloseable.

+    * @tparam V The result type of FUNC.

+    * @return The result of executing FUNC with RESOURCES.

+    */

+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T))(func: ((T, T, T)) => V): V = {

+    val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])

+    try {

+      func(resources)

+    } catch {

+      // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions

+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>

+        throw closeAllResourcesMergingExceptions(closeableResources, e)

+    } finally {

+      closeAllResourcesFinally(closeableResources)

+    }

+  }

+

+  /**

+    * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:

+    * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...)

+    * tryWithResources(resources){

+    *   case (first, second, ...) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    *     ...

+    * }

+    *

+    * or, if desired,

+    *

+    * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){

+    *   case (first, second, ...) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    *     ...

+    * }

+    *

+    * @param resources A tuple of AutoCloseable resources to use in FUNC.

+    * @param func      The function block to execute (think of this as the try block).

+    * @tparam T Any subtype of AutoCloseable.

+    * @tparam V The result type of FUNC.

+    * @return The result of executing FUNC with RESOURCES.

+    */

+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T))(func: ((T, T, T, T)) => V): V = {

+    val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])

+    try {

+      func(resources)

+    } catch {

+      // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions

+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>

+        throw closeAllResourcesMergingExceptions(closeableResources, e)

+    } finally {

+      closeAllResourcesFinally(closeableResources)

+    }

+  }

+

+  /**

+    * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:

+    * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...)

+    * tryWithResources(resources){

+    *   case (first, second, ...) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    *     ...

+    * }

+    *

+    * or, if desired,

+    *

+    * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){

+    *   case (first, second, ...) =>

+    *     first.doSomething()

+    *     second.doSomething()

+    *     ...

+    * }

+    *

+    * @param resources A tuple of AutoCloseable resources to use in FUNC.

+    * @param func      The function block to execute (think of this as the try block).

+    * @tparam T Any subtype of AutoCloseable.

+    * @tparam V The result type of FUNC.

+    * @return The result of executing FUNC with RESOURCES.

+    */

+  def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T, T))(func: ((T, T, T, T, T)) => V): V = {

+    val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])

+    try {

+      func(resources)

+    } catch {

+      // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions

+      case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>

+        throw closeAllResourcesMergingExceptions(closeableResources, e)

+    } finally {

+      closeAllResourcesFinally(closeableResources)

+    }

+  }

+

+  /**

+    * Given a list of closeables RESOURCES and an throwable EXCEPTION, close all supplied resources, merging any

+    * additional errors that occur. The main point here is to ensure every resource in resources is closed; I'm not sure

+    * how useful the "merge" logic actually is when pulled out to a list of closeables instead of just a single one.

+    *

+    * @param resources The list of resources to close.

+    * @param exception The exception to merge additional throwables into.

+    * @return The final throwable resulting from "merging" EXCEPTION with any additional throwables raised while closing

+    *         the resources in RESOURCES.

+    */

+  private def closeAllResourcesMergingExceptions(resources: Seq[AutoCloseable], exception: Throwable): Throwable = {

+    resources.foldRight(exception)((resource, ex) =>

+      try {

+        resource.close()

+        ex

+      } catch {

+        case NonFatal(e2) =>

+          // e2 isn't fatal, let the original exception e take precedence

+          ex.addSuppressed(e2)

+          ex

+        case e2: Throwable =>

+          if (NonFatal(ex)) {

+            // e2 is fatal but e isn't, suppress e

+            e2.addSuppressed(ex)

+            e2

+          } else {

+            // Both exceptions are fatal, suppress the also-fatal e2 that occurred while closing

+            ex.addSuppressed(e2)

+            ex

+          }

+      }

+    )

+  }

+

+  /**

+    * Given RESOURCES, attempts to close all of them even in the face of errors. Arbitrarily, the last exception

+    * encountered is thrown, with earlier exceptions suppressed.

+    *

+    * @param resources The list of resources to close.

+    */

+  private def closeAllResourcesFinally(resources: Seq[AutoCloseable]): Unit = {

+    // Using foldRight to iterate over resources to ensure we don't short circuit and leave resources unclosed if an

+    // earlier resource throws an exception on .close().

+    val exceptionOption = resources

+      .foldRight(None.asInstanceOf[Option[Throwable]])((resource, exOpt) =>

+        try {

+          resource.close()

+          exOpt

+        } catch {

+          case e: Throwable =>

+            Some(exOpt.fold(e) { ex =>

+              ex.addSuppressed(e)

+              ex

+            })

+        }

+      )

+    if (exceptionOption.isDefined) {

+      throw exceptionOption.get

+    }

+  }

+}

diff --git a/spark/src/main/scala/org/apache/druid/spark/package.scala b/spark/src/main/scala/org/apache/druid/spark/package.scala
new file mode 100644
index 0000000..22b440a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/package.scala
@@ -0,0 +1,27 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid

+

+import com.fasterxml.jackson.databind.ObjectMapper

+import org.apache.druid.jackson.DefaultObjectMapper

+

+package object spark {

+  private[spark] val MAPPER: ObjectMapper = new DefaultObjectMapper()

+}

diff --git a/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
new file mode 100644
index 0000000..0bdb9cd
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
@@ -0,0 +1,76 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.druid.spark.configuration

+

+import org.scalatest.funsuite.AnyFunSuite

+import org.scalatest.matchers.should.Matchers

+

+class ConfigurationSuite extends AnyFunSuite with Matchers {

+  private val testConf: Configuration = Configuration(Map[String, String](

+    Configuration.toKey("test", "sub-conf", "property") -> "testProp",

+    Configuration.toKey("test", "property") -> "quizProp",

+    "property" -> "examProperty"

+  ))

+

+  test("isPresent should correctly handle single keys as well as paths to properties") {

+    testConf.isPresent("property") should be(true)

+    testConf.isPresent("test", "sub-conf", "property") should be(true)

+    testConf.isPresent("test") should be(false)

+    testConf.isPresent("exam", "sub-conf", "property") should be(false)

+    testConf.isPresent("test.property") should be(true)

+    testConf.isPresent("test.property.property") should be(false)

+  }

+

+  test("dive should correctly return sub-configurations") {

+    val subConf = testConf.dive("test")

+    subConf.getString("property") should equal("quizProp")

+    subConf.dive("sub-conf") should equal(Configuration.fromKeyValue("property", "testProp"))

+    subConf.dive("sub-conf") should equal(testConf.dive("test", "sub-conf"))

+  }

+

+  test("dive should return empty maps when called on uncontained namespaces") {

+    testConf.dive("exam").toMap.isEmpty should be(true)

+  }

+

+  test("Configurations should be case-insensitive") {

+    testConf.getString("pRoPeRtY") should equal(testConf.getString("property"))

+  }

+

+  test("merge should correctly combine Configurations") {

+    val otherConf = Configuration(Map[String, String](

+      Configuration.toKey("other", "conf", "key") -> "value",

+      "property" -> "new property"

+    ))

+    val mergedConf = testConf.merge(otherConf)

+    mergedConf.getString(Configuration.toKey("other", "conf", "key")) should equal("value")

+    mergedConf.getString("property") should equal("new property")

+  }

+

+  test("merge should correctly namespace merged Configurations") {

+    val otherConf = Configuration(Map[String, String](

+      "key" -> "1",

+      "property" -> "new property"

+    ))

+    val mergedConf = testConf.merge("test", otherConf)

+    mergedConf.getString(Configuration.toKey("test", "property")) should equal("new property")

+    mergedConf.getInt(Configuration.toKey("test", "key"), -1) should equal(1)

+    mergedConf("property") should equal("examProperty")

+  }

+}