Set up, confs, mixins, and clients. (#11474)
* Set up, confs, mixins, and clients.
Initial module set up (e.g. pom, checkstyle, exclusions, licenses, etc.), support classes (mix-ins traits for logging and the try-with-resources pattern & configuration helpers), and an HTTP client for talking to a Druid cluster.
diff --git a/.travis.yml b/.travis.yml
index 4211f18..4f8e68a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -74,7 +74,7 @@
script: ${MVN} animal-sniffer:check --fail-at-end
- name: "checkstyle"
- script: ${MVN} checkstyle:checkstyle --fail-at-end
+ script: ${MVN} checkstyle:checkstyle --fail-at-end -pl '!spark' && ${MVN} scalastyle:check --fail-at-end -pl 'spark'
- name: "enforcer checks"
script: ${MVN} enforcer:enforce --fail-at-end
diff --git a/LICENSE b/LICENSE
index aa65ab9..7318c8d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -279,6 +279,13 @@
This product contains lpad and rpad methods adapted from Apache Flink.
* core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+ This product contains Scala logging and serializable Hadoop configuration utilities adapted from Apache Spark.
+ * spark/src/main/scala/org/apache/druid/spark/mixins/Logging
+
+ This product contains a Druid client wrapper adapted from Imply Data's druid-hadoop-input-format.
+ * spark/src/main/scala/org/apache/druid/spark/clients/DruidClient
+ * spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder
+
MIT License
================================
diff --git a/NOTICE b/NOTICE
index 7761a54..64e3a2b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -102,3 +102,35 @@
+
+================= Apache Spark 2.4.7 =================
+
+Apache Spark
+Copyright 2014 and onwards The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Export Control Notice
+---------------------
+
+This distribution includes cryptographic software. The country in which you currently reside may have
+restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
+BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
+the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
+<http://www.wassenaar.org/> for more information.
+
+The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
+software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
+using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
+Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
+Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
+both object code and source code.
+
+The following provides more details on the included cryptographic software:
+
+This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
+support authentication, and encryption and decryption of data sent across the network between
+services.
+
diff --git a/codestyle/scalastyle_config.xml b/codestyle/scalastyle_config.xml
new file mode 100644
index 0000000..9e2ed4d
--- /dev/null
+++ b/codestyle/scalastyle_config.xml
@@ -0,0 +1,139 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<scalastyle commentFilter="enabled">
+ <name>Apache Druid Scalastyle configuration</name>
+ <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+ <parameters>
+ <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLineLength"><![CDATA[120]]></parameter>
+ <parameter name="tabSize"><![CDATA[4]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/>
+ <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+ <parameters>
+ <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="maxParameters"><![CDATA[8]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
+ <parameters>
+ <parameter name="maxTypes"><![CDATA[30]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+ <parameters>
+ <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+ <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLength"><![CDATA[50]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
+ <parameters>
+ <parameter name="maxMethods"><![CDATA[30]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.RedundantIfChecker" enabled="true"></check>
+ <check customId="println" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[println]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.EmptyClassChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[A-Z_]$]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.UnderscoreImportChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.LowercasePatternMatchChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ImportGroupingChecker" enabled="true"></check>
+</scalastyle>
\ No newline at end of file
diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index cb6cb9c..f70c128 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -44,6 +44,10 @@
<Class name="org.apache.druid.server.AsyncQueryForwardingServlet" />
</And>
</Match>
+ <!-- Spot Bugs doesn't work for Scala -->
+ <Match>
+ <Package name="~org\.apache\.druid\.spark.*"/>
+ </Match>
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
diff --git a/hooks/pre-push.sh b/hooks/pre-push.sh
index a0928db..31b40ba 100755
--- a/hooks/pre-push.sh
+++ b/hooks/pre-push.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-mvn checkstyle:checkstyle --fail-at-end
+mvn checkstyle:checkstyle --fail-at-end -pl '!spark' && mvn scalastyle:check --fail-at-end -pl spark
diff --git a/pom.xml b/pom.xml
index eb066a4..9786eac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
<protobuf.version>3.11.0</protobuf.version>
<resilience4j.version>1.3.1</resilience4j.version>
<slf4j.version>1.7.12</slf4j.version>
+ <surefire.version>2.22.2</surefire.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.8.5</hadoop.compile.version>
<mockito.version>3.8.0</mockito.version>
@@ -142,6 +143,8 @@
<!-- Core cloud functionality -->
<module>cloud/aws-common</module>
<module>cloud/gcp-common</module>
+ <!-- Spark Connectors -->
+ <module>spark</module>
<!-- Core extensions -->
<module>extensions-core/kubernetes-extensions</module>
<module>extensions-core/avro-extensions</module>
@@ -1517,7 +1520,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.2</version>
+ <version>${surefire.version}</version>
<configuration>
<!-- locale settings must be set on the command line before startup -->
<!-- set default options -->
@@ -1777,6 +1780,8 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
+ <!-- Explicitly setting version here so that intelliJ uses the right schema for inspections -->
+ <version>${surefire.version}</version>
<executions>
<execution>
<phase>test</phase>
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..6dcae79
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,444 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>druid-spark</artifactId>
+ <name>druid-spark</name>
+ <description>Spark connectors for reading data from and writing data to Druid clusters</description>
+
+ <parent>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid</artifactId>
+ <version>0.22.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <properties>
+ <scala.version>${scala.major.version}.12</scala.version>
+ <scala.major.version>2.12</scala.major.version>
+ <spark.version>2.4.8</spark.version>
+ <!-- These two properties allow -Dcheckstyle.skip to suppress scalastyle checks as well -->
+ <checkstyle.skip>false</checkstyle.skip>
+ <scalastyle.skip>${checkstyle.skip}</scalastyle.skip>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-core</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>airline</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.timeandspace</groupId>
+ <artifactId>cron-scheduler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.thisptr</groupId>
+ <artifactId>jackson-jq</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-console</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.gridkit.lab</groupId>
+ <artifactId>jvm-attach-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.hyperic</groupId>
+ <artifactId>sigar</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mozilla</groupId>
+ <artifactId>rhino</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.skife.config</groupId>
+ <artifactId>config-magic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.ibm.icu</groupId>
+ <artifactId>icu4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-artifact</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mozilla</groupId>
+ <artifactId>rhino</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.skife.config</groupId>
+ <artifactId>config-magic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Spark -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.major.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.major.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-unsafe_${scala.major.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.major.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ <version>${scala.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Tests -->
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-core</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.hyperic</groupId>
+ <artifactId>sigar</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.major.version}</artifactId>
+ <version>3.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalactic</groupId>
+ <artifactId>scalactic_${scala.major.version}</artifactId>
+ <version>3.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- scala build -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
+ <executions>
+ <execution>
+ <id>eclipse-add-source</id>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>attach-scaladocs</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>doc-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-unchecked</arg>
+ <arg>-deprecation</arg>
+ <arg>-feature</arg>
+ </args>
+ <jvmArgs>
+ <jvmArg>-Xms1024m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+ <!-- disable surefire -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ <!-- enable scalatest -->
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <parallel>false</parallel>
+ <argLine>-Dderby.stream.error.field=org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM\
+ -Dlog4j.configuration=${project.basedir}/test/resources/log4j2.xml
+ </argLine>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../codestyle/scalastyle_config.xml</configLocation>
+ </configuration>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
new file mode 100644
index 0000000..577f87e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.google.common.net.HostAndPort
+import org.apache.druid.java.util.common.{IAE, ISE, Intervals, JodaUtils, StringUtils}
+import org.apache.druid.java.util.http.client.response.{StringFullResponseHandler,
+ StringFullResponseHolder}
+import org.apache.druid.java.util.http.client.{HttpClient, Request}
+import org.apache.druid.query.Druids
+import org.apache.druid.query.metadata.metadata.{ColumnAnalysis, SegmentAnalysis,
+ SegmentMetadataQuery}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.jboss.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
+import org.joda.time.{Duration, Interval}
+
+import java.net.URL
+import java.util.{List => JList}
+import javax.ws.rs.core.MediaType
+import scala.annotation.tailrec
+import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter,
+ mapAsScalaMapConverter, seqAsJavaListConverter}
+
+/**
+ * Going with SegmentMetadataQueries despite the significant overhead because there's no other way
+ * to get accurate information about Druid columns.
+ *
+ * This is pulled pretty directly from the druid-hadoop-input-format project on GitHub. There is
+ * likely substantial room for improvement.
+ */
+class DruidClient(
+ httpClient: HttpClient,
+ hostAndPort: HostAndPort,
+ numRetries: Int,
+ retryWaitSeconds: Int,
+ timeoutWaitMilliseconds: Int
+ ) extends Logging {
+ private val druidBaseQueryURL: HostAndPort => String =
+ (hostAndPort: HostAndPort) => s"http://$hostAndPort/druid/v2/"
+
+ private val DefaultSegmentMetadataInterval = List[Interval](Intervals.utc(
+ JodaUtils.MIN_INSTANT,
+ JodaUtils.MAX_INSTANT
+ ))
+
+ /**
+ * The SQL system catalog tables are incorrect for multivalue columns and don't have accurate
+ * type info, so we need to fall back to segmentMetadataQueries. Given a DATASOURCE and a range
+ * of INTERVALS to query over, return a map from column name to a tuple of
+ * (columnType, hasMultipleValues). Note that this is a very expensive operation over large
+ * numbers of segments. If possible, this method should only be called with the most granular
+ * starting and ending intervals instead of over a larger interval.
+ *
+ * @param dataSource The Druid dataSource to fetch the schema for.
+ * @param intervals The intervals to return the schema for, or None to query all segments.
+ * @return A map from column name to data type and whether or not the column is multi-value
+ * for the schema of DATASOURCE.
+ */
+ def getSchema(dataSource: String, intervals: Option[List[Interval]]): Map[String, (String, Boolean)] = {
+ val queryInterval = intervals.getOrElse(DefaultSegmentMetadataInterval)
+ val body = Druids.newSegmentMetadataQueryBuilder()
+ .dataSource(dataSource)
+ .intervals(queryInterval.asJava)
+ .analysisTypes(SegmentMetadataQuery.AnalysisType.SIZE)
+ .merge(true)
+ .context(Map[String, AnyRef](
+ "timeout" -> Int.box(timeoutWaitMilliseconds)
+ ).asJava)
+ .build()
+ val response = sendRequestWithRetry(
+ druidBaseQueryURL(hostAndPort), numRetries, Option(MAPPER.writeValueAsBytes(body))
+ )
+ val segments =
+ MAPPER.readValue[JList[SegmentAnalysis]](
+ response.getContent, new TypeReference[JList[SegmentAnalysis]]() {})
+ if (segments.size() == 0) {
+ throw new IAE(
+ s"No segments found for intervals [${intervals.mkString(",")}] on $dataSource"
+ )
+ }
+ // Since we're setting merge to true, there should only be one item in the list
+ if (segments.size() > 1) {
+ throw new ISE("Merged segment metadata response had more than one row!")
+ }
+ log.debug(segments.asScala.map(_.toString).mkString("SegmentAnalysis: [", ", ", "]"))
+ /*
+ * If a dimension has multiple types within the spanned interval, the resulting column
+ * analysis will have the type "STRING" and be an error message. We abuse that here to infer
+ * a string type for the dimension and widen the type for the resulting DataFrame.
+ */
+ val columns = segments.asScala.head.getColumns.asScala.toMap
+ columns.foreach{ case(key, column) =>
+ if (column.isError) {
+ log.warn(s"Multiple column types found for dimension $key in interval" +
+ s" ${queryInterval.mkString("[", ", ", "]")}! Falling back to STRING type")
+ }
+ }
+ columns.map{ case (name: String, col: ColumnAnalysis) =>
+ name -> (col.getType, col.isHasMultipleValues)
+ }
+ }
+
+ /*
+ * Marking this method as tail recursive because it is for now. If a finally block,
+ * special error handling, or more involved set up and tear down code is added, this
+ * method may no longer be tail recursive and so compilation will fail. Because the
+ * number of retries is user-configurable and will likely be relatively small,
+ * latency in communication with a Druid cluster for Segment Metadata will be dominated
+ * by the query time, and Scala will optimize tail recursive calls regardless of annotation,
+ * future developers shouldn't be concerned if they need to remove this annotation.
+ */
+ @tailrec
+ private def sendRequestWithRetry(
+ url: String,
+ retryCount: Int,
+ content: Option[Array[Byte]] = None
+ ): StringFullResponseHolder = {
+ try {
+ sendRequest(url, content)
+ } catch {
+ case e: Exception =>
+ if (retryCount > 0) {
+ logInfo(s"Got exception: ${e.getMessage}, retrying ...")
+ Thread.sleep(retryWaitSeconds * 1000)
+ sendRequestWithRetry(url, retryCount - 1, content)
+ } else {
+ throw e
+ }
+ }
+ }
+
+ private def sendRequest(url: String, content: Option[Array[Byte]]): StringFullResponseHolder = {
+ try {
+ val request = buildRequest(url, content)
+ var response = httpClient.go(
+ request,
+ new StringFullResponseHandler(StringUtils.UTF8_CHARSET),
+ Duration.millis(timeoutWaitMilliseconds)
+ ).get
+ if (response.getStatus == HttpResponseStatus.TEMPORARY_REDIRECT) {
+ val newUrl = response.getResponse.headers().get("Location")
+ logInfo(s"Got a redirect, new location: $newUrl")
+ response = httpClient.go(
+ buildRequest(newUrl, content), new StringFullResponseHandler(StringUtils.UTF8_CHARSET)
+ ).get
+ }
+ if (!(response.getStatus == HttpResponseStatus.OK)) {
+ throw new ISE(
+ s"Error getting response for %s, status[%s] content[%s]",
+ url,
+ response.getStatus,
+ response.getContent
+ )
+ }
+ response
+ } catch {
+ case e: Exception =>
+ throw e
+ }
+ }
+
+ def buildRequest(url: String, content: Option[Array[Byte]]): Request = {
+ content.map(
+ new Request(HttpMethod.POST, new URL(url))
+ .setHeader("Content-Type", MediaType.APPLICATION_JSON)
+ .setContent(_)
+ ).getOrElse(
+ new Request(HttpMethod.GET, new URL(url))
+ )
+ }
+}
+
+object DruidClient {
+ // TODO: Add support for Kerberized etc. clients
+ def apply(conf: Configuration): DruidClient = {
+ val brokerConf = conf.dive(DruidConfigurationKeys.brokerPrefix)
+ new DruidClient(
+ HttpClientHolder.create.get,
+ HostAndPort.fromParts(
+ brokerConf.get(DruidConfigurationKeys.brokerHostDefaultKey),
+ brokerConf.getInt(DruidConfigurationKeys.brokerPortDefaultKey)),
+ brokerConf.getInt(DruidConfigurationKeys.numRetriesDefaultKey),
+ brokerConf.getInt(DruidConfigurationKeys.retryWaitSecondsDefaultKey),
+ brokerConf.getInt(DruidConfigurationKeys.timeoutMillisecondsDefaultKey)
+ )
+ }
+}
+
diff --git a/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala
new file mode 100644
index 0000000..b13d2c8
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/clients/HttpClientHolder.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.google.common.base.Throwables
+import org.apache.druid.java.util.common.lifecycle.Lifecycle
+import org.apache.druid.java.util.http.client.{HttpClient, HttpClientConfig, HttpClientInit}
+
+import java.io.{Closeable, IOException}
+
+object HttpClientHolder {
+ def create: HttpClientHolder = {
+ val lifecycle = new Lifecycle
+ val httpClient = HttpClientInit.createClient(HttpClientConfig.builder.build, lifecycle)
+ try {
+ lifecycle.start()
+ } catch {
+ case e: Exception =>
+ throw Throwables.propagate(e)
+ }
+ new HttpClientHolder(lifecycle, httpClient)
+ }
+}
+
+class HttpClientHolder(val lifecycle: Lifecycle, val client: HttpClient) extends Closeable {
+ def get: HttpClient = {
+ client
+ }
+
+ @throws[IOException]
+ override def close(): Unit = {
+ lifecycle.stop()
+ }
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
new file mode 100644
index 0000000..71f31de
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/Configuration.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+/**
+ * A simple wrapper around a properties map that can also "dive" into a sub-Configuration if keys are stored using
+ * dotted separators. Because this class is designed to make working with Spark DataSourceOptions easier and DSOs are
+ * case-insensitive, this class is also case-insensitive.
+ *
+ * @param properties A Map of String property names to String property values to back this Configuration.
+ */
+class Configuration(properties: Map[String, String]) extends Serializable {
+ def getAs[T](key: String): T = {
+ properties(StringUtils.toLowerCase(key)).asInstanceOf[T]
+ }
+
+ def get(key: String): Option[String] = {
+ properties.get(StringUtils.toLowerCase(key))
+ }
+
+ def get(keyWithDefault: (String, String)): String = {
+ this.get(StringUtils.toLowerCase(keyWithDefault._1)).getOrElse(keyWithDefault._2)
+ }
+
+ def getString(key: String): String = {
+ properties.getOrElse(StringUtils.toLowerCase(key), "")
+ }
+
+ def getInt(key: String, default: Int): Int = {
+ properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toInt)
+ }
+
+ def getInt(keyWithDefault: (String, Int)): Int = {
+ this.getInt(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
+ }
+
+ def getLong(key: String, default: Long): Long = {
+ properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toLong)
+ }
+
+ def getLong(keyWithDefault: (String, Long)): Long = {
+ this.getLong(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
+ }
+
+ def getBoolean(key: String, default: Boolean): Boolean = {
+ properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toBoolean)
+ }
+
+ def getBoolean(keyWithDefault: (String, Boolean)): Boolean = {
+ this.getBoolean(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
+ }
+
+ def apply(key: String): String = {
+ this.get(key) match {
+ case Some(v) => v
+ case None => throw new NoSuchElementException(s"Key $key not found!")
+ }
+ }
+
+ def isPresent(key: String): Boolean = {
+ properties.isDefinedAt(StringUtils.toLowerCase(key))
+ }
+
+ def isPresent(paths: String*): Boolean = {
+ properties.isDefinedAt(StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR)))
+ }
+
+ /**
+ * Given a prefix PREFIX, return a Configuration object containing every key in this
+ * configuration that starts with PREFIX. Keys in the resulting Configuration will have PREFIX
+ * removed from their start.
+ *
+ * For example, if a Configuration contains the key `druid.broker.host` and `dive("druid")` is
+ * called on it, the resulting Configuration will contain the key `broker.host` with the same
+ * value as `druid.broker.host` had in the original Configuration.
+ *
+ * @param prefix The namespace to "dive" into.
+ * @return A Configuration containing the keys in this Configuration that start with PREFIX,
+ * stripped of their leading PREFIX.
+ */
+ def dive(prefix: String): Configuration = {
+ new Configuration(properties
+ .filterKeys(_.startsWith(s"$prefix${Configuration.SEPARATOR}"))
+ .map{case (k, v) => k.substring(prefix.length + 1) -> v})
+ }
+
+ /**
+ * Given a number of prefixes, return a Configuration object containing every key in this
+ * configuration that starts with `prefixes.mkString(Configuration.SEPARATOR)`. Keys in the
+ * resulting Configuration will have the concatenation of PREFIXES removed from their start.
+ *
+ * Note that this is the equivalent of chaining `dive` calls, not chaining `merge` calls.
+ *
+ * @param prefixes The namespaces, in order, to "dive" into.
+ * @return A Configuration containing the keys that start with every prefix in PREFIXES joined
+ * by periods, stripped of the leading prefixes matching the prefixes in PREFIXES.
+ */
+ def dive(prefixes: String*): Configuration = {
+ prefixes.foldLeft(this){case (conf, prefix) => conf.dive(prefix)}
+ }
+
+ /**
+ * Combine this configuration with another Configuration. If keys collide between these
+ * configurations, the corresponding values in OTHER will be selected.
+ *
+ * @param other A Configuration to merge with this Configuration.
+ * @return A Configuration containing the union of keys between this Configuration and OTHER.
+ * If keys collide between the two Configurations, the values in OTHER will be kept.
+ */
+ def merge(other: Configuration): Configuration = {
+ new Configuration(this.properties ++ other.toMap)
+ }
+
+ /**
+ * Combine this configuration with another Configuration, moving the other Configuration to the provided name space.
+ * If keys collide between this configuration and the newly-namespaced OTHER, the corresponding values in OTHER will
+ * be selected.
+ *
+ * @param namespace The name space to merge OTHER under.
+ * @param other The Configuration to merge with this Configuration.
+ * @return A new Configuration object containing all the keys in this Configuration, plus the keys in OTHER
+ * namespaced under NAMESPACE.
+ */
+ def merge(namespace: String, other: Configuration): Configuration = {
+ this.merge(Configuration(namespace, other.toMap))
+ }
+
+ /**
+ * Add the properties specified in PROPERTIES to this Configuration's properties, moving the new properties to the
+ * provided name space. If this Configuration already contains keys under the provided name space and those keys
+ * collide with the properties specified in PROPERTIES, the corresponding values in PROPERTIES will be selected.
+ *
+ * @param namespace The name space to merge the properties specified in PROPERTIES under.
+ * @param properties The map of properties to values to combine with the properties from this Configuration.
+ * @return A new Configuration object containing all the keys in this Configuration, plus the properties in
+ * PROPERTIES namespaced under NAMESPACE.
+ */
+ def merge(namespace: String, properties: Map[String, String]): Configuration = {
+ this.merge(Configuration(namespace, properties))
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj.isInstanceOf[Configuration] && this.toMap == obj.asInstanceOf[Configuration].toMap
+ }
+
+ override def hashCode(): Int = {
+ this.properties.hashCode()
+ }
+
+ def toMap: Map[String, String] = {
+ this.properties
+ }
+
+ override def toString: String = {
+ this.toMap.mkString("Configuration{", "; ", "}")
+ }
+}
+
+object Configuration {
+ def apply(properties: Map[String, String]): Configuration = {
+ new Configuration(properties.map{case (k, v) => StringUtils.toLowerCase(k) -> v})
+ }
+
+ def apply(namespace: String, properties: Map[String, String]): Configuration = {
+ new Configuration(properties.map{case(k, v) => StringUtils.toLowerCase(s"$namespace$SEPARATOR$k") -> v})
+ }
+
+ def apply(dso: DataSourceOptions): Configuration = {
+ new Configuration(dso.asMap().asScala.toMap)
+ }
+
+ def fromKeyValue(key: String, value: String): Configuration = {
+ new Configuration(Map[String, String](StringUtils.toLowerCase(key) -> value))
+ }
+
+ /**
+ * Get the key corresponding to each element of PATHS interpreted as a namespace or property.
+ *
+ * @param paths The parent namespaces and property as individual strings to convert into a single configuration key.
+ * @return The path to a property through its parent namespaces as a single configuration key.
+ */
+ def toKey(paths: String*): String = {
+ StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR))
+ }
+
+ private val SEPARATOR = "."
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
new file mode 100644
index 0000000..cfa1d74
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/configuration/DruidConfigurationKeys.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+object DruidConfigurationKeys {
+ // Druid Client Configs
+ val brokerPrefix: String = "broker"
+ val brokerHostKey: String = "host"
+ val brokerPortKey: String = "port"
+ val numRetriesKey: String = "numRetries"
+ val retryWaitSecondsKey: String = "retryWaitSeconds"
+ val timeoutMillisecondsKey: String = "timeoutMilliseconds"
+ private[spark] val brokerHostDefaultKey: (String, String) = (brokerHostKey, "localhost")
+ private[spark] val brokerPortDefaultKey: (String, Int) = (brokerPortKey, 8082)
+ private[spark] val numRetriesDefaultKey: (String, Int) = (numRetriesKey, 5)
+ private[spark] val retryWaitSecondsDefaultKey: (String, Int) = (retryWaitSecondsKey, 5)
+ private[spark] val timeoutMillisecondsDefaultKey: (String, Int) = (timeoutMillisecondsKey, 300000)
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala
new file mode 100644
index 0000000..055c22b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/mixins/Logging.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.mixins
+
+import org.apache.druid.java.util.common.logger.Logger
+
+/**
+ * Simplified version of org.apache.spark.internal.Logging. Uses
+ * org.apache.druid.java.util.common.logger.Logger instead of slf4j's Logger directly.
+ */
+trait Logging {
+
+ @transient private var logger: Logger = _
+
+ private lazy val logName = this.getClass.getName.stripSuffix("$")
+
+ /**
+ * Return the configured underlying logger
+ *
+ * @return the configured underlying logger
+ */
+ protected def log: Logger = {
+ if (logger == null) {
+ logger= new Logger(logName)
+ }
+ logger
+ }
+
+ /**
+ * Log a message at the TRACE level.
+ *
+ * @param msg the message string to be logged
+ */
+ protected def logTrace(msg: => String): Unit = {
+ if (log.isTraceEnabled) {
+ log.trace(msg)
+ }
+ }
+
+ /**
+ * Log a message at the DEBUG level.
+ *
+ * @param msg the message string to be logged
+ */
+ protected def logDebug(msg: => String): Unit = {
+ if (log.isDebugEnabled) {
+ log.debug(msg)
+ }
+ }
+
+ /**
+ * Log a message at the INFO level.
+ *
+ * @param msg the message string to be logged
+ */
+ protected def logInfo(msg: => String): Unit = {
+ if (log.isInfoEnabled) {
+ log.info(msg)
+ }
+ }
+
+ /**
+ * Log a message at the WARN level.
+ *
+ * @param msg the message string to be logged
+ */
+ protected def logWarn(msg: => String): Unit = {
+ log.warn(msg)
+ }
+
+ /**
+ * Log a message with an exception at the WARN level.
+ *
+ * @param msg the message string to be logged
+ * @param exception the exception to log in addition to the message
+ */
+ protected def logWarn(msg: => String, exception: Throwable): Unit = {
+ log.warn(exception, msg)
+ }
+
+ /**
+ * Log a message at the ERROR level.
+ *
+ * @param msg the message string to be logged
+ */
+ protected def logError(msg: => String): Unit = {
+ log.error(msg)
+ }
+
+ /**
+ * Log a message with an exception at the ERROR level.
+ *
+ * @param msg the message string to be logged
+ * @param exception the exception to log in addition to the message
+ */
+ protected def logError(msg: => String, exception: Throwable): Unit = {
+ log.error(exception, msg)
+ }
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala
new file mode 100644
index 0000000..6b0ff24
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/mixins/TryWithResources.scala
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.mixins
+
+import scala.util.control.{ControlThrowable, NonFatal}
+
+/**
+ * Utility trait to ape the try-with-resources construct in Java. This is quick and dirty and has its flaws. If a more
+ * robust approach is needed, the better-files approach (https://github.com/pathikrit/better-files#lightweight-arm)
+ * should be considered.
+ *
+ * Scala doesn't support varargs of generic type, so using overloading to support managing multiple resources with a
+ * sequence. Ideally Scala would support macro programming to generate tryWithResources functions for
+ * tuples of [_ <: AutoCloseable] so users could un-tuple multiple resources as named variables without needing to
+ * manually generate separate methods for each length up to 22, but since it doesn't I've created methods for up to
+ * five resources at once. These methods can be nested or the method taking a Sequence of AutoCloseables can be used
+ * for arbitrary resources, with the caveat that named unpacking is not possible over Sequences. If there was a way to
+ * enforce type bounds on the elements of subclasses of Produce we could hack up a type that unioned AutoCloseable and
+ * Products whose elements are all subtypes of AutoCloseable, but since there isn't this is a quick fix. Note that
+ * shared code for the tuple methods is not refactored to a single private method because the common supertype of
+ * tuples is Product, with no type information.
+ */
+trait TryWithResources {
+ /**
+ * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:
+ * val resource = new ResourceImplementingAutoCloseable()
+ * tryWithResources(resource){r =>
+ * r.doSomething()
+ * }
+ *
+ * or, if desired,
+ *
+ * tryWithResources(new ResourceImplementingAutoCloseable()){ resouce =>
+ * resource.doSomething()
+ * }
+ *
+ * @param resource The AutoCloseable resource to use in FUNC.
+ * @param func The function block to execute (think of this as the try block).
+ * @tparam T Any subtype of AutoCloseable.
+ * @tparam V The result type of FUNC.
+ * @return The result of executing FUNC with RESOURCE.
+ */
+ def tryWithResources[T <: AutoCloseable, V](resource: T)(func: T => V): V = {
+ try {
+ func(resource)
+ } catch {
+ // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions
+ case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+ try {
+ resource.close()
+ } catch {
+ case NonFatal(e2) =>
+ // e2 isn't fatal, let the original exception e take precedence
+ e.addSuppressed(e2)
+ case e2: Throwable =>
+ if (NonFatal(e)) {
+ // e2 is fatal but e isn't, suppress e
+ e2.addSuppressed(e)
+ throw e2
+ }
+ // Both exceptions are fatal, suppress the also-fatal e2 that occurred while closing
+ e.addSuppressed(e2)
+ }
+ throw e
+ } finally {
+ resource.close()
+ }
+ }
+
+ /**
+ * A helper function to duplicate Java's try-with-resources construction. Unfortunately Scala doesn't support varargs
+ * for generic types and I don't feel like writing 22 separate functions, so callers will have to keep track of the
+ * exact ordering of elements in the resource sequence or nest the provided tuple methods.
+ *
+ * To use, mix in this trait and then call like so:
+ * val fileResource = new ResourceImplementingAutoCloseable()
+ * val writerResource = new OtherAutoCloseable()
+ * tryWithResources(Seq(fileResource, writerResource)){resources =>
+ * val file = resources(0)
+ * val writer = resources(1)
+ * writer.write(file, data)
+ * }
+ *
+ * @param resources A list of AutoCloseable resources to use in FUNC.
+ * @param func The function block to execute (think of this as the try block).
+ * @tparam T Any subtype of AutoCloseable.
+ * @tparam V The result type of FUNC.
+ * @return The result of executing FUNC with RESOURCES.
+ */
+ def tryWithResources[T <: AutoCloseable, V](resources: Seq[T])(func: Seq[T] => V): V = {
+ try {
+ func(resources)
+ } catch {
+ // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions
+ case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+ throw closeAllResourcesMergingExceptions(resources, e)
+ } finally {
+ closeAllResourcesFinally(resources)
+ }
+ }
+
+ /**
+ * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:
+ * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable())
+ * tryWithResources(resources){
+ * case (first, second) =>
+ * first.doSomething()
+ * second.doSomething()
+ * }
+ *
+ * or, if desired,
+ *
+ * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable){
+ * case (first, second) =>
+ * first.doSomething()
+ * second.doSomething
+ * }
+ *
+ * @param resources A tuple of AutoCloseable resources to use in FUNC.
+ * @param func The function block to execute (think of this as the try block).
+ * @tparam T Any subtype of AutoCloseable.
+ * @tparam V The result type of FUNC.
+ * @return The result of executing FUNC with RESOURCES.
+ */
+ def tryWithResources[T <: AutoCloseable, V](resources: (T, T))(func: ((T, T)) => V): V = {
+ val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+ try {
+ func(resources)
+ } catch {
+ // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions
+ case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+ throw closeAllResourcesMergingExceptions(closeableResources, e)
+ } finally {
+ closeAllResourcesFinally(closeableResources)
+ }
+ }
+
+ /**
+ * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:
+ * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...)
+ * tryWithResources(resources){
+ * case (first, second, ...) =>
+ * first.doSomething()
+ * second.doSomething()
+ * ...
+ * }
+ *
+ * or, if desired,
+ *
+ * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){
+ * case (first, second, ...) =>
+ * first.doSomething()
+ * second.doSomething()
+ * ...
+ * }
+ *
+ * @param resources A tuple of AutoCloseable resources to use in FUNC.
+ * @param func The function block to execute (think of this as the try block).
+ * @tparam T Any subtype of AutoCloseable.
+ * @tparam V The result type of FUNC.
+ * @return The result of executing FUNC with RESOURCES.
+ */
+ def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T))(func: ((T, T, T)) => V): V = {
+ val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+ try {
+ func(resources)
+ } catch {
+ // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions
+ case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+ throw closeAllResourcesMergingExceptions(closeableResources, e)
+ } finally {
+ closeAllResourcesFinally(closeableResources)
+ }
+ }
+
+ /**
+ * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:
+ * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...)
+ * tryWithResources(resources){
+ * case (first, second, ...) =>
+ * first.doSomething()
+ * second.doSomething()
+ * ...
+ * }
+ *
+ * or, if desired,
+ *
+ * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){
+ * case (first, second, ...) =>
+ * first.doSomething()
+ * second.doSomething()
+ * ...
+ * }
+ *
+ * @param resources A tuple of AutoCloseable resources to use in FUNC.
+ * @param func The function block to execute (think of this as the try block).
+ * @tparam T Any subtype of AutoCloseable.
+ * @tparam V The result type of FUNC.
+ * @return The result of executing FUNC with RESOURCES.
+ */
+ def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T))(func: ((T, T, T, T)) => V): V = {
+ val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+ try {
+ func(resources)
+ } catch {
+ // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions
+ case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+ throw closeAllResourcesMergingExceptions(closeableResources, e)
+ } finally {
+ closeAllResourcesFinally(closeableResources)
+ }
+ }
+
+ /**
+ * A helper function to duplicate Java's try-with-resources construction. Mix in this trait and then call like so:
+ * val resources = (new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable(), ...)
+ * tryWithResources(resources){
+ * case (first, second, ...) =>
+ * first.doSomething()
+ * second.doSomething()
+ * ...
+ * }
+ *
+ * or, if desired,
+ *
+ * tryWithResources(new ResourceImplementingAutoCloseable(), new ResourceImplementingAutoCloseable, ...){
+ * case (first, second, ...) =>
+ * first.doSomething()
+ * second.doSomething()
+ * ...
+ * }
+ *
+ * @param resources A tuple of AutoCloseable resources to use in FUNC.
+ * @param func The function block to execute (think of this as the try block).
+ * @tparam T Any subtype of AutoCloseable.
+ * @tparam V The result type of FUNC.
+ * @return The result of executing FUNC with RESOURCES.
+ */
+ def tryWithResources[T <: AutoCloseable, V](resources: (T, T, T, T, T))(func: ((T, T, T, T, T)) => V): V = {
+ val closeableResources = resources.productIterator.toSeq.map(_.asInstanceOf[AutoCloseable])
+ try {
+ func(resources)
+ } catch {
+ // Clean up for InterruptedExceptions and ControlThrowables, not just NonFatal exceptions
+ case e@(NonFatal(_) | _: InterruptedException | _: ControlThrowable) =>
+ throw closeAllResourcesMergingExceptions(closeableResources, e)
+ } finally {
+ closeAllResourcesFinally(closeableResources)
+ }
+ }
+
+ /**
+ * Given a list of closeables RESOURCES and an throwable EXCEPTION, close all supplied resources, merging any
+ * additional errors that occur. The main point here is to ensure every resource in resources is closed; I'm not sure
+ * how useful the "merge" logic actually is when pulled out to a list of closeables instead of just a single one.
+ *
+ * @param resources The list of resources to close.
+ * @param exception The exception to merge additional throwables into.
+ * @return The final throwable resulting from "merging" EXCEPTION with any additional throwables raised while closing
+ * the resources in RESOURCES.
+ */
+ private def closeAllResourcesMergingExceptions(resources: Seq[AutoCloseable], exception: Throwable): Throwable = {
+ resources.foldRight(exception)((resource, ex) =>
+ try {
+ resource.close()
+ ex
+ } catch {
+ case NonFatal(e2) =>
+ // e2 isn't fatal, let the original exception e take precedence
+ ex.addSuppressed(e2)
+ ex
+ case e2: Throwable =>
+ if (NonFatal(ex)) {
+ // e2 is fatal but e isn't, suppress e
+ e2.addSuppressed(ex)
+ e2
+ } else {
+ // Both exceptions are fatal, suppress the also-fatal e2 that occurred while closing
+ ex.addSuppressed(e2)
+ ex
+ }
+ }
+ )
+ }
+
+ /**
+ * Given RESOURCES, attempts to close all of them even in the face of errors. Arbitrarily, the last exception
+ * encountered is thrown, with earlier exceptions suppressed.
+ *
+ * @param resources The list of resources to close.
+ */
+ private def closeAllResourcesFinally(resources: Seq[AutoCloseable]): Unit = {
+ // Using foldRight to iterate over resources to ensure we don't short circuit and leave resources unclosed if an
+ // earlier resource throws an exception on .close().
+ val exceptionOption = resources
+ .foldRight(None.asInstanceOf[Option[Throwable]])((resource, exOpt) =>
+ try {
+ resource.close()
+ exOpt
+ } catch {
+ case e: Throwable =>
+ Some(exOpt.fold(e) { ex =>
+ ex.addSuppressed(e)
+ ex
+ })
+ }
+ )
+ if (exceptionOption.isDefined) {
+ throw exceptionOption.get
+ }
+ }
+}
diff --git a/spark/src/main/scala/org/apache/druid/spark/package.scala b/spark/src/main/scala/org/apache/druid/spark/package.scala
new file mode 100644
index 0000000..22b440a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/druid/spark/package.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.druid.jackson.DefaultObjectMapper
+
+package object spark {
+ private[spark] val MAPPER: ObjectMapper = new DefaultObjectMapper()
+}
diff --git a/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
new file mode 100644
index 0000000..0bdb9cd
--- /dev/null
+++ b/spark/src/test/scala/org/apache/druid/spark/configuration/ConfigurationSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.configuration
+
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.matchers.should.Matchers
+
+class ConfigurationSuite extends AnyFunSuite with Matchers {
+ private val testConf: Configuration = Configuration(Map[String, String](
+ Configuration.toKey("test", "sub-conf", "property") -> "testProp",
+ Configuration.toKey("test", "property") -> "quizProp",
+ "property" -> "examProperty"
+ ))
+
+ test("isPresent should correctly handle single keys as well as paths to properties") {
+ testConf.isPresent("property") should be(true)
+ testConf.isPresent("test", "sub-conf", "property") should be(true)
+ testConf.isPresent("test") should be(false)
+ testConf.isPresent("exam", "sub-conf", "property") should be(false)
+ testConf.isPresent("test.property") should be(true)
+ testConf.isPresent("test.property.property") should be(false)
+ }
+
+ test("dive should correctly return sub-configurations") {
+ val subConf = testConf.dive("test")
+ subConf.getString("property") should equal("quizProp")
+ subConf.dive("sub-conf") should equal(Configuration.fromKeyValue("property", "testProp"))
+ subConf.dive("sub-conf") should equal(testConf.dive("test", "sub-conf"))
+ }
+
+ test("dive should return empty maps when called on uncontained namespaces") {
+ testConf.dive("exam").toMap.isEmpty should be(true)
+ }
+
+ test("Configurations should be case-insensitive") {
+ testConf.getString("pRoPeRtY") should equal(testConf.getString("property"))
+ }
+
+ test("merge should correctly combine Configurations") {
+ val otherConf = Configuration(Map[String, String](
+ Configuration.toKey("other", "conf", "key") -> "value",
+ "property" -> "new property"
+ ))
+ val mergedConf = testConf.merge(otherConf)
+ mergedConf.getString(Configuration.toKey("other", "conf", "key")) should equal("value")
+ mergedConf.getString("property") should equal("new property")
+ }
+
+ test("merge should correctly namespace merged Configurations") {
+ val otherConf = Configuration(Map[String, String](
+ "key" -> "1",
+ "property" -> "new property"
+ ))
+ val mergedConf = testConf.merge("test", otherConf)
+ mergedConf.getString(Configuration.toKey("test", "property")) should equal("new property")
+ mergedConf.getInt(Configuration.toKey("test", "key"), -1) should equal(1)
+ mergedConf("property") should equal("examProperty")
+ }
+}