PHOENIX-5197: Use extraOptions to set the configuration for Spark Workers
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2f47957
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,30 @@
+#general java
+*.class
+*.war
+*.jar
+
+# python
+*.pyc
+.checkstyle
+
+# eclipse stuffs
+.settings/*
+*/.settings/
+.classpath
+.project
+*/.externalToolBuilders
+*/maven-eclipse.xml
+
+# intellij stuff
+.idea/
+*.iml
+*.ipr
+*.iws
+
+#maven stuffs
+target/
+release/
+RESULTS/
+CSV_EXPORT/
+.DS_Store
+
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index efdb8cb..2f6e9ed 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -60,5 +60,5 @@
 
 CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR  CONSTRAINT pk PRIMARY KEY (TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
 CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
- CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
- upsert into GIGANTIC_TABLE values(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')
+CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
+UPSERT INTO GIGANTIC_TABLE VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')
diff --git a/phoenix-spark/src/it/resources/log4j.xml b/phoenix-spark/src/it/resources/log4j.xml
index 10c2dc0..578a19b 100644
--- a/phoenix-spark/src/it/resources/log4j.xml
+++ b/phoenix-spark/src/it/resources/log4j.xml
@@ -32,39 +32,39 @@
   </appender>
 
   <logger name="org.eclipse">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name="org.apache">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name = "org.apache.phoenix.mapreduce">
-    <level value="FATAL"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name="org.mortbay">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name="org.spark-project.jetty">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name="akka">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name="BlockStateChange">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <logger name="io.netty">
-    <level value="ERROR"/>
+    <level value="DEBUG"/>
   </logger>
 
   <root>
-    <priority value="INFO"/>
+    <priority value="DEBUG"/>
     <appender-ref ref="console"/>
   </root>
 </log4j:configuration>
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index ad79d1c..02b5edf 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -18,7 +18,9 @@
 package org.apache.phoenix.spark.datasource.v2;
 
 import java.util.Optional;
+import java.util.Properties;
 
+import org.apache.log4j.Logger;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
 import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
@@ -39,8 +41,10 @@
  */
 public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, WriteSupport, DataSourceRegister {
 
+    private static final Logger logger = Logger.getLogger(PhoenixDataSource.class);
     public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
     public static final String ZOOKEEPER_URL = "zkUrl";
+    public static final String PHOENIX_CONFIGS = "phoenixconfigs";
 
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
@@ -64,6 +68,40 @@
         return Optional.of(new PhoenixDatasourceWriter(writeOptions));
     }
 
+    /**
+     * Extract HBase and Phoenix properties that need to be set in both the driver and workers.
+     * We expect these properties to be passed against the key
+     * {@link PhoenixDataSource#PHOENIX_CONFIGS}. The corresponding value should be a
+     * comma-separated string containing property names and property values. For example:
+     * prop1=val1,prop2=val2,prop3=val3
+     * @param options DataSource options passed in
+     * @return Properties map
+     */
+    public static Properties extractPhoenixHBaseConfFromOptions(final DataSourceOptions options) {
+        Properties confToSet = new Properties();
+        if (options != null) {
+            Optional phoenixConfigs = options.get(PHOENIX_CONFIGS);
+            if (phoenixConfigs.isPresent()) {
+                String phoenixConf = String.valueOf(phoenixConfigs.get());
+                String[] confs = phoenixConf.split(",");
+                for (String conf : confs) {
+                    String[] confKeyVal = conf.split("=");
+                    try {
+                        confToSet.setProperty(confKeyVal[0], confKeyVal[1]);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        throw new RuntimeException("Incorrect format for phoenix/HBase configs. "
+                                + "Expected format: <prop1>=<val1>,<prop2>=<val2>,<prop3>=<val3>..",
+                                e);
+                    }
+                }
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Got the following Phoenix/HBase config:\n" + confToSet);
+            }
+        }
+        return confToSet;
+    }
+
     private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
             StructType schema) {
         String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
@@ -72,7 +110,9 @@
         boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
         return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
                 .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
-                .setSkipNormalizingIdentifier(skipNormalizingIdentifier).build();
+                .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
+                .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+                .build();
     }
 
     @Override
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 8c2fdb1..70062c8 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -17,7 +17,10 @@
  */
 package org.apache.phoenix.spark.datasource.v2.reader;
 
+import com.google.common.base.Preconditions;
+
 import java.io.Serializable;
+import java.util.Properties;
 
 public class PhoenixDataSourceReadOptions implements Serializable {
 
@@ -25,12 +28,16 @@
     private final String zkUrl;
     private final String scn;
     private final String selectStatement;
+    private final Properties overriddenProps;
 
-    public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, String selectStatement) {
+    public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
+            String selectStatement, Properties overriddenProps) {
+        Preconditions.checkNotNull(overriddenProps);
         this.zkUrl = zkUrl;
         this.scn = scn;
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
+        this.overriddenProps = overriddenProps;
     }
 
     public String getSelectStatement() {
@@ -48,4 +55,8 @@
     public String getTenantId() {
         return tenantId;
     }
+
+    public Properties getOverriddenProps() {
+        return overriddenProps;
+    }
 }
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 446d96f..8476509 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -58,6 +58,10 @@
 import java.util.Optional;
 import java.util.Properties;
 
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
 public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
         SupportsPushDownRequiredColumns {
 
@@ -65,6 +69,7 @@
     private final String tableName;
     private final String zkUrl;
     private final boolean dateAsTimestamp;
+    private final Properties overriddenProps;
 
     private StructType schema;
     private Filter[] pushedFilters = new Filter[]{};
@@ -82,6 +87,7 @@
         this.tableName = options.tableName().get();
         this.zkUrl = options.get("zkUrl").get();
         this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
         setSchema();
     }
 
@@ -89,7 +95,8 @@
      * Sets the schema using all the table columns before any column pruning has been done
      */
     private void setSchema() {
-        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl)) {
+        try (Connection conn = DriverManager.getConnection(
+                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
             schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
@@ -119,14 +126,14 @@
         // Generate splits based off statistics, or just region splits?
         boolean splitByStats = options.getBoolean(
                 PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        Properties overridingProps = new Properties();
         if(currentScnValue.isPresent()) {
-            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
         }
         if (tenantId.isPresent()){
-            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
         }
-        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+        try (Connection conn = DriverManager.getConnection(
+                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, Lists.newArrayList(schema.names()));
             final Statement statement = conn.createStatement();
             final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
@@ -171,8 +178,9 @@
                         location.getRegionInfo().getRegionName()
                 );
 
-                PhoenixDataSourceReadOptions phoenixDataSourceOptions = new PhoenixDataSourceReadOptions(zkUrl,
-                        currentScnValue.orElse(null), tenantId.orElse(null), selectStatement);
+                PhoenixDataSourceReadOptions phoenixDataSourceOptions =
+                        new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
+                                tenantId.orElse(null), selectStatement, overriddenProps);
                 if (splitByStats) {
                     for (Scan aScan : scans) {
                         partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index 30e84db..6f6413b 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -57,6 +57,9 @@
 
 import scala.collection.Iterator;
 
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
 public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow>  {
 
     private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
@@ -77,14 +80,15 @@
         String scn = options.getScn();
         String tenantId = options.getTenantId();
         String zkUrl = options.getZkUrl();
-        Properties overridingProps = new Properties();
+        Properties overridingProps = options.getOverriddenProps();
         if (scn != null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
         }
         if (tenantId != null) {
             overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
-        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+        try (Connection conn = DriverManager.getConnection(
+                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) {
             final Statement statement = conn.createStatement();
             final String selectStatement = options.getSelectStatement();
             Preconditions.checkNotNull(selectStatement);
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index 781d1c8..434f13c 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -17,9 +17,11 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
+import jline.internal.Preconditions;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.Serializable;
+import java.util.Properties;
 
 public class PhoenixDataSourceWriteOptions implements Serializable {
 
@@ -29,15 +31,19 @@
     private final String scn;
     private final StructType schema;
     private final boolean skipNormalizingIdentifier;
+    private final Properties overriddenProps;
 
-    private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, String tenantId,
-                                          StructType schema, boolean skipNormalizingIdentifier) {
+    private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn,
+            String tenantId, StructType schema, boolean skipNormalizingIdentifier,
+            Properties overriddenProps) {
+        Preconditions.checkNotNull(overriddenProps);
         this.tableName = tableName;
         this.zkUrl = zkUrl;
         this.scn = scn;
         this.tenantId = tenantId;
         this.schema = schema;
         this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+        this.overriddenProps = overriddenProps;
     }
 
     public String getScn() {
@@ -64,6 +70,10 @@
         return skipNormalizingIdentifier;
     }
 
+    public Properties getOverriddenProps() {
+        return overriddenProps;
+    }
+
     public static class Builder {
         private String tableName;
         private String zkUrl;
@@ -71,6 +81,7 @@
         private String tenantId;
         private StructType schema;
         private boolean skipNormalizingIdentifier;
+        private Properties overriddenProps = new Properties();
 
         public Builder setTableName(String tableName) {
             this.tableName = tableName;
@@ -102,8 +113,14 @@
             return this;
         }
 
+        public Builder setOverriddenProps(Properties overriddenProps) {
+            this.overriddenProps = overriddenProps;
+            return this;
+        }
+
         public PhoenixDataSourceWriteOptions build() {
-            return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, skipNormalizingIdentifier);
+            return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema,
+                    skipNormalizingIdentifier, overriddenProps);
         }
     }
 }
\ No newline at end of file
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 32dc07a..6793673 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -40,6 +40,8 @@
 import org.apache.spark.sql.types.StructType;
 
 import com.google.common.collect.Lists;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixDataWriter implements DataWriter<InternalRow> {
 
@@ -51,7 +53,7 @@
         String scn = options.getScn();
         String tenantId = options.getTenantId();
         String zkUrl = options.getZkUrl();
-        Properties overridingProps = new Properties();
+        Properties overridingProps = options.getOverriddenProps();
         if (scn != null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
         }
@@ -60,7 +62,8 @@
         }
         this.schema = options.getSchema();
         try {
-            this.conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps);
+            this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl,
+                    overridingProps);
             List<String> colNames = Lists.newArrayList(options.getSchema().names());
             if (!options.skipNormalizingIdentifier()){
                 colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
index 04f243d..9d713b8 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.spark.datasource.v2.writer;
 
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
diff --git a/phoenix-spark/src/main/resources/log4j.xml b/phoenix-spark/src/main/resources/log4j.xml
new file mode 100644
index 0000000..10c2dc0
--- /dev/null
+++ b/phoenix-spark/src/main/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out"/>
+
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
+    </layout>
+  </appender>
+
+  <logger name="org.eclipse">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="org.apache">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name = "org.apache.phoenix.mapreduce">
+    <level value="FATAL"/>
+  </logger>
+
+  <logger name="org.mortbay">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="org.spark-project.jetty">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="akka">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="BlockStateChange">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="io.netty">
+    <level value="ERROR"/>
+  </logger>
+
+  <root>
+    <priority value="INFO"/>
+    <appender-ref ref="console"/>
+  </root>
+</log4j:configuration>
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSourceTest.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSourceTest.java
new file mode 100644
index 0000000..018f517
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSourceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.PHOENIX_CONFIGS;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class PhoenixDataSourceTest {
+
+    private static final String P1 = "p1";
+    private static final String P2 = "p2";
+    private static final String P3 = "p3";
+    private static final String V1 = "v1";
+    private static final String V2 = "v2";
+    private static final String V3 = "v3";
+    private static final String EQ = "=";
+    private static final String COMMA = ",";
+    private static final String SINGLE_PHOENIX_PROP = P1 + EQ + V1;
+    private static final String VALID_PHOENIX_PROPS_LIST =
+            SINGLE_PHOENIX_PROP + COMMA + P2 + EQ + V2 + COMMA + P3 + EQ + V3;
+    private static final String INVALID_PHOENIX_PROPS_LIST =
+            SINGLE_PHOENIX_PROP + COMMA + P2 + V2 + COMMA + P3 + EQ + V3;
+
+    @Test
+    public void testExtractSinglePhoenixProp() {
+        Map<String, String> props = new HashMap<>();
+        props.put(PHOENIX_CONFIGS, SINGLE_PHOENIX_PROP);
+        Properties p = extractPhoenixHBaseConfFromOptions(new DataSourceOptions(props));
+        assertEquals(V1, p.getProperty(P1));
+    }
+
+    @Test
+    public void testPhoenixConfigsExtractedProperly() {
+        Map<String, String> props = new HashMap<>();
+        // Add another random option
+        props.put("k", "v");
+        props.put(PHOENIX_CONFIGS, VALID_PHOENIX_PROPS_LIST);
+        Properties p = extractPhoenixHBaseConfFromOptions(new DataSourceOptions(props));
+        assertEquals(V1, p.getProperty(P1));
+        assertEquals(V2, p.getProperty(P2));
+        assertEquals(V3, p.getProperty(P3));
+    }
+
+    @Test
+    public void testInvalidConfThrowsException() {
+        Map<String, String> props = new HashMap<>();
+        props.put(PHOENIX_CONFIGS, INVALID_PHOENIX_PROPS_LIST);
+        try {
+            extractPhoenixHBaseConfFromOptions(new DataSourceOptions(props));
+            fail("Should have thrown an exception!");
+        } catch (RuntimeException rte) {
+            assertTrue(rte.getCause() instanceof ArrayIndexOutOfBoundsException);
+        }
+    }
+
+    @Test
+    public void testNullOptionsReturnsEmptyMap() {
+        assertTrue(extractPhoenixHBaseConfFromOptions(null).isEmpty());
+    }
+
+}
diff --git a/phoenix-spark/src/test/resources/log4j.xml b/phoenix-spark/src/test/resources/log4j.xml
new file mode 100644
index 0000000..578a19b
--- /dev/null
+++ b/phoenix-spark/src/test/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out"/>
+
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
+    </layout>
+  </appender>
+
+  <logger name="org.eclipse">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name="org.apache">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name = "org.apache.phoenix.mapreduce">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name="org.mortbay">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name="org.spark-project.jetty">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name="akka">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name="BlockStateChange">
+    <level value="DEBUG"/>
+  </logger>
+
+  <logger name="io.netty">
+    <level value="DEBUG"/>
+  </logger>
+
+  <root>
+    <priority value="DEBUG"/>
+    <appender-ref ref="console"/>
+  </root>
+</log4j:configuration>