PHOENIX-5197: Use extraOptions to set the configuration for Spark Workers
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2f47957
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,30 @@
+#general java
+*.class
+*.war
+*.jar
+
+# python
+*.pyc
+.checkstyle
+
+# eclipse stuffs
+.settings/*
+*/.settings/
+.classpath
+.project
+*/.externalToolBuilders
+*/maven-eclipse.xml
+
+# intellij stuff
+.idea/
+*.iml
+*.ipr
+*.iws
+
+#maven stuffs
+target/
+release/
+RESULTS/
+CSV_EXPORT/
+.DS_Store
+
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index efdb8cb..2f6e9ed 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -60,5 +60,5 @@
CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
- CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
- upsert into GIGANTIC_TABLE values(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')
+CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
+UPSERT INTO GIGANTIC_TABLE VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')
diff --git a/phoenix-spark/src/it/resources/log4j.xml b/phoenix-spark/src/it/resources/log4j.xml
index 10c2dc0..578a19b 100644
--- a/phoenix-spark/src/it/resources/log4j.xml
+++ b/phoenix-spark/src/it/resources/log4j.xml
@@ -32,39 +32,39 @@
</appender>
<logger name="org.eclipse">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<logger name="org.apache">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<logger name = "org.apache.phoenix.mapreduce">
- <level value="FATAL"/>
+ <level value="DEBUG"/>
</logger>
<logger name="org.mortbay">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<logger name="org.spark-project.jetty">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<logger name="akka">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<logger name="BlockStateChange">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<logger name="io.netty">
- <level value="ERROR"/>
+ <level value="DEBUG"/>
</logger>
<root>
- <priority value="INFO"/>
+ <priority value="DEBUG"/>
<appender-ref ref="console"/>
</root>
</log4j:configuration>
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index ad79d1c..02b5edf 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -18,7 +18,9 @@
package org.apache.phoenix.spark.datasource.v2;
import java.util.Optional;
+import java.util.Properties;
+import org.apache.log4j.Logger;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
@@ -39,8 +41,10 @@
*/
public class PhoenixDataSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister {
+ private static final Logger logger = Logger.getLogger(PhoenixDataSource.class);
public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
public static final String ZOOKEEPER_URL = "zkUrl";
+ public static final String PHOENIX_CONFIGS = "phoenixconfigs";
@Override
public DataSourceReader createReader(DataSourceOptions options) {
@@ -64,6 +68,40 @@
return Optional.of(new PhoenixDatasourceWriter(writeOptions));
}
+ /**
+ * Extract HBase and Phoenix properties that need to be set in both the driver and workers.
+ * We expect these properties to be passed against the key
+ * {@link PhoenixDataSource#PHOENIX_CONFIGS}. The corresponding value should be a
+ * comma-separated string containing property names and property values. For example:
+ * prop1=val1,prop2=val2,prop3=val3
+ * @param options DataSource options passed in
+ * @return Properties map
+ */
+ public static Properties extractPhoenixHBaseConfFromOptions(final DataSourceOptions options) {
+ Properties confToSet = new Properties();
+ if (options != null) {
+ Optional phoenixConfigs = options.get(PHOENIX_CONFIGS);
+ if (phoenixConfigs.isPresent()) {
+ String phoenixConf = String.valueOf(phoenixConfigs.get());
+ String[] confs = phoenixConf.split(",");
+ for (String conf : confs) {
+ String[] confKeyVal = conf.split("=");
+ try {
+ confToSet.setProperty(confKeyVal[0], confKeyVal[1]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new RuntimeException("Incorrect format for phoenix/HBase configs. "
+ + "Expected format: <prop1>=<val1>,<prop2>=<val2>,<prop3>=<val3>..",
+ e);
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Got the following Phoenix/HBase config:\n" + confToSet);
+ }
+ }
+ return confToSet;
+ }
+
private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
StructType schema) {
String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
@@ -72,7 +110,9 @@
boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
.setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
- .setSkipNormalizingIdentifier(skipNormalizingIdentifier).build();
+ .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
+ .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+ .build();
}
@Override
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 8c2fdb1..70062c8 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -17,7 +17,10 @@
*/
package org.apache.phoenix.spark.datasource.v2.reader;
+import com.google.common.base.Preconditions;
+
import java.io.Serializable;
+import java.util.Properties;
public class PhoenixDataSourceReadOptions implements Serializable {
@@ -25,12 +28,16 @@
private final String zkUrl;
private final String scn;
private final String selectStatement;
+ private final Properties overriddenProps;
- public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, String selectStatement) {
+ public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
+ String selectStatement, Properties overriddenProps) {
+ Preconditions.checkNotNull(overriddenProps);
this.zkUrl = zkUrl;
this.scn = scn;
this.tenantId = tenantId;
this.selectStatement = selectStatement;
+ this.overriddenProps = overriddenProps;
}
public String getSelectStatement() {
@@ -48,4 +55,8 @@
public String getTenantId() {
return tenantId;
}
+
+ public Properties getOverriddenProps() {
+ return overriddenProps;
+ }
}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 446d96f..8476509 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -58,6 +58,10 @@
import java.util.Optional;
import java.util.Properties;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
SupportsPushDownRequiredColumns {
@@ -65,6 +69,7 @@
private final String tableName;
private final String zkUrl;
private final boolean dateAsTimestamp;
+ private final Properties overriddenProps;
private StructType schema;
private Filter[] pushedFilters = new Filter[]{};
@@ -82,6 +87,7 @@
this.tableName = options.tableName().get();
this.zkUrl = options.get("zkUrl").get();
this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+ this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
setSchema();
}
@@ -89,7 +95,8 @@
* Sets the schema using all the table columns before any column pruning has been done
*/
private void setSchema() {
- try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl)) {
+ try (Connection conn = DriverManager.getConnection(
+ JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
@@ -119,14 +126,14 @@
// Generate splits based off statistics, or just region splits?
boolean splitByStats = options.getBoolean(
PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
- Properties overridingProps = new Properties();
if(currentScnValue.isPresent()) {
- overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
+ overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
}
if (tenantId.isPresent()){
- overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
+ overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
}
- try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+ try (Connection conn = DriverManager.getConnection(
+ JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, Lists.newArrayList(schema.names()));
final Statement statement = conn.createStatement();
final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
@@ -171,8 +178,9 @@
location.getRegionInfo().getRegionName()
);
- PhoenixDataSourceReadOptions phoenixDataSourceOptions = new PhoenixDataSourceReadOptions(zkUrl,
- currentScnValue.orElse(null), tenantId.orElse(null), selectStatement);
+ PhoenixDataSourceReadOptions phoenixDataSourceOptions =
+ new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
+ tenantId.orElse(null), selectStatement, overriddenProps);
if (splitByStats) {
for (Scan aScan : scans) {
partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index 30e84db..6f6413b 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -57,6 +57,9 @@
import scala.collection.Iterator;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow> {
private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
@@ -77,14 +80,15 @@
String scn = options.getScn();
String tenantId = options.getTenantId();
String zkUrl = options.getZkUrl();
- Properties overridingProps = new Properties();
+ Properties overridingProps = options.getOverriddenProps();
if (scn != null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
}
if (tenantId != null) {
overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
- try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+ try (Connection conn = DriverManager.getConnection(
+ JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) {
final Statement statement = conn.createStatement();
final String selectStatement = options.getSelectStatement();
Preconditions.checkNotNull(selectStatement);
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index 781d1c8..434f13c 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -17,9 +17,11 @@
*/
package org.apache.phoenix.spark.datasource.v2.writer;
+import jline.internal.Preconditions;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
+import java.util.Properties;
public class PhoenixDataSourceWriteOptions implements Serializable {
@@ -29,15 +31,19 @@
private final String scn;
private final StructType schema;
private final boolean skipNormalizingIdentifier;
+ private final Properties overriddenProps;
- private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, String tenantId,
- StructType schema, boolean skipNormalizingIdentifier) {
+ private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn,
+ String tenantId, StructType schema, boolean skipNormalizingIdentifier,
+ Properties overriddenProps) {
+ Preconditions.checkNotNull(overriddenProps);
this.tableName = tableName;
this.zkUrl = zkUrl;
this.scn = scn;
this.tenantId = tenantId;
this.schema = schema;
this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+ this.overriddenProps = overriddenProps;
}
public String getScn() {
@@ -64,6 +70,10 @@
return skipNormalizingIdentifier;
}
+ public Properties getOverriddenProps() {
+ return overriddenProps;
+ }
+
public static class Builder {
private String tableName;
private String zkUrl;
@@ -71,6 +81,7 @@
private String tenantId;
private StructType schema;
private boolean skipNormalizingIdentifier;
+ private Properties overriddenProps = new Properties();
public Builder setTableName(String tableName) {
this.tableName = tableName;
@@ -102,8 +113,14 @@
return this;
}
+ public Builder setOverriddenProps(Properties overriddenProps) {
+ this.overriddenProps = overriddenProps;
+ return this;
+ }
+
public PhoenixDataSourceWriteOptions build() {
- return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, skipNormalizingIdentifier);
+ return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema,
+ skipNormalizingIdentifier, overriddenProps);
}
}
}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 32dc07a..6793673 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -40,6 +40,8 @@
import org.apache.spark.sql.types.StructType;
import com.google.common.collect.Lists;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
public class PhoenixDataWriter implements DataWriter<InternalRow> {
@@ -51,7 +53,7 @@
String scn = options.getScn();
String tenantId = options.getTenantId();
String zkUrl = options.getZkUrl();
- Properties overridingProps = new Properties();
+ Properties overridingProps = options.getOverriddenProps();
if (scn != null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
}
@@ -60,7 +62,8 @@
}
this.schema = options.getSchema();
try {
- this.conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps);
+ this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl,
+ overridingProps);
List<String> colNames = Lists.newArrayList(options.getSchema().names());
if (!options.skipNormalizingIdentifier()){
colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
index 04f243d..9d713b8 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.spark.datasource.v2.writer;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
diff --git a/phoenix-spark/src/main/resources/log4j.xml b/phoenix-spark/src/main/resources/log4j.xml
new file mode 100644
index 0000000..10c2dc0
--- /dev/null
+++ b/phoenix-spark/src/main/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.eclipse">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="org.apache">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name = "org.apache.phoenix.mapreduce">
+ <level value="FATAL"/>
+ </logger>
+
+ <logger name="org.mortbay">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="org.spark-project.jetty">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="akka">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="BlockStateChange">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="io.netty">
+ <level value="ERROR"/>
+ </logger>
+
+ <root>
+ <priority value="INFO"/>
+ <appender-ref ref="console"/>
+ </root>
+</log4j:configuration>
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSourceTest.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSourceTest.java
new file mode 100644
index 0000000..018f517
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSourceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.PHOENIX_CONFIGS;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class PhoenixDataSourceTest {
+
+ private static final String P1 = "p1";
+ private static final String P2 = "p2";
+ private static final String P3 = "p3";
+ private static final String V1 = "v1";
+ private static final String V2 = "v2";
+ private static final String V3 = "v3";
+ private static final String EQ = "=";
+ private static final String COMMA = ",";
+ private static final String SINGLE_PHOENIX_PROP = P1 + EQ + V1;
+ private static final String VALID_PHOENIX_PROPS_LIST =
+ SINGLE_PHOENIX_PROP + COMMA + P2 + EQ + V2 + COMMA + P3 + EQ + V3;
+ private static final String INVALID_PHOENIX_PROPS_LIST =
+ SINGLE_PHOENIX_PROP + COMMA + P2 + V2 + COMMA + P3 + EQ + V3;
+
+ @Test
+ public void testExtractSinglePhoenixProp() {
+ Map<String, String> props = new HashMap<>();
+ props.put(PHOENIX_CONFIGS, SINGLE_PHOENIX_PROP);
+ Properties p = extractPhoenixHBaseConfFromOptions(new DataSourceOptions(props));
+ assertEquals(V1, p.getProperty(P1));
+ }
+
+ @Test
+ public void testPhoenixConfigsExtractedProperly() {
+ Map<String, String> props = new HashMap<>();
+ // Add another random option
+ props.put("k", "v");
+ props.put(PHOENIX_CONFIGS, VALID_PHOENIX_PROPS_LIST);
+ Properties p = extractPhoenixHBaseConfFromOptions(new DataSourceOptions(props));
+ assertEquals(V1, p.getProperty(P1));
+ assertEquals(V2, p.getProperty(P2));
+ assertEquals(V3, p.getProperty(P3));
+ }
+
+ @Test
+ public void testInvalidConfThrowsException() {
+ Map<String, String> props = new HashMap<>();
+ props.put(PHOENIX_CONFIGS, INVALID_PHOENIX_PROPS_LIST);
+ try {
+ extractPhoenixHBaseConfFromOptions(new DataSourceOptions(props));
+ fail("Should have thrown an exception!");
+ } catch (RuntimeException rte) {
+ assertTrue(rte.getCause() instanceof ArrayIndexOutOfBoundsException);
+ }
+ }
+
+ @Test
+ public void testNullOptionsReturnsEmptyMap() {
+ assertTrue(extractPhoenixHBaseConfFromOptions(null).isEmpty());
+ }
+
+}
diff --git a/phoenix-spark/src/test/resources/log4j.xml b/phoenix-spark/src/test/resources/log4j.xml
new file mode 100644
index 0000000..578a19b
--- /dev/null
+++ b/phoenix-spark/src/test/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.eclipse">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name="org.apache">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name = "org.apache.phoenix.mapreduce">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name="org.mortbay">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name="org.spark-project.jetty">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name="akka">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name="BlockStateChange">
+ <level value="DEBUG"/>
+ </logger>
+
+ <logger name="io.netty">
+ <level value="DEBUG"/>
+ </logger>
+
+ <root>
+ <priority value="DEBUG"/>
+ <appender-ref ref="console"/>
+ </root>
+</log4j:configuration>