PHOENIX-5232: PhoenixDataWriter in Phoenix-Spark connector does not commit when mutation batch size is reached
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 0bbd983..56d2111 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -478,8 +478,8 @@
   </dependencies>
 
   <build>
-    <testSourceDirectory>src/it/scala</testSourceDirectory>
-    <testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
+    <testResources><testResource><directory>src/test/resources</directory></testResource></testResources>
     <plugins>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index b40b638..58910ce 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -16,12 +16,17 @@
 import java.sql.DriverManager
 import java.util.Date
 
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, PhoenixTestingDataSource}
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.SparkException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SaveMode}
 
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 /**
@@ -249,6 +254,30 @@
     count shouldEqual 1L
   }
 
+  test("Can use extraOptions to set configs for workers during reads") {
+    // Pass in true, so we will get null when fetching the current row, leading to an NPE
+    var extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW + "=true"
+    var rdd = spark.sqlContext.read
+      .format(PhoenixTestingDataSource.TEST_SOURCE)
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+
+    // Expect to get a NullPointerException in the executors
+    var error = intercept[SparkException] {
+      rdd.take(2)(0)(1)
+    }
+    assert(error.getCause.isInstanceOf[NullPointerException])
+
+    // Pass in false, so we will get the expected rows
+    extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW + "=false"
+    rdd = spark.sqlContext.read
+      .format(PhoenixTestingDataSource.TEST_SOURCE)
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+    val stringValue = rdd.take(2)(0)(1)
+    stringValue shouldEqual "test_row_1"
+  }
+
   test("Can save to phoenix table") {
     val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
 
@@ -282,6 +311,47 @@
     }
   }
 
+  test("Can use extraOptions to set configs for workers during writes") {
+    val totalRecords = 100
+    val upsertBatchSize = 5
+
+    val records = new mutable.MutableList[Row]
+    for (x <- 1 to totalRecords) {
+      records += Row(x.toLong, x.toString, x)
+    }
+    val dataSet = records.toList
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("COL1", StringType),
+        StructField("COL2", IntegerType)))
+
+    // Distribute the dataset into an RDD with just 1 partition so we use only 1 executor.
+    // This makes it easy to deterministically count the batched commits from that executor
+    // since it corresponds to exactly 1 input partition. In case of multiple executors with
+    // an uneven distribution of input partitions, if
+    // (number of records in that partition) % batchSize != 0, some updates would also be committed
+    // via PhoenixDataWriter#commit rather than the batch commits via PhoenixDataWriter#write
+    // and those would thus, not be counted by PhoenixTestingDataWriter.
+    val rowRDD = spark.sparkContext.parallelize(dataSet, 1)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+    val extraOptions =  PhoenixConfigurationUtil.UPSERT_BATCH_SIZE + "=" + upsertBatchSize.toString
+
+    // Initially, this should be zero
+    PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual 0
+    df.write
+      .format(PhoenixTestingDataSource.TEST_SOURCE)
+      .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Verify the number of times batched updates are committed via DataWriters
+    PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual totalRecords/upsertBatchSize
+  }
+
   test("Can save dates to Phoenix using java.sql.Date") {
     val date = java.sql.Date.valueOf("2016-09-30")
 
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index 02b5edf..21250af 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -21,11 +21,8 @@
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -52,20 +49,9 @@
     }
 
     @Override
-    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode,
-            DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {
-            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
-        }
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
-
-        PhoenixDataSourceWriteOptions writeOptions = createPhoenixDataSourceWriteOptions(options, schema);
-        return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema,
+            SaveMode mode, DataSourceOptions options) {
+        return Optional.of(new PhoenixDataSourceWriter(mode, schema, options));
     }
 
     /**
@@ -102,19 +88,6 @@
         return confToSet;
     }
 
-    private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
-            StructType schema) {
-        String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
-        String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
-        String zkUrl = options.get(ZOOKEEPER_URL).get();
-        boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
-        return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
-                .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
-                .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
-                .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
-                .build();
-    }
-
     @Override
     public String shortName() {
         return "phoenix";
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 70062c8..67343d4 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -22,7 +22,7 @@
 import java.io.Serializable;
 import java.util.Properties;
 
-public class PhoenixDataSourceReadOptions implements Serializable {
+class PhoenixDataSourceReadOptions implements Serializable {
 
     private final String tenantId;
     private final String zkUrl;
@@ -30,7 +30,7 @@
     private final String selectStatement;
     private final Properties overriddenProps;
 
-    public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
+    PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
             String selectStatement, Properties overriddenProps) {
         Preconditions.checkNotNull(overriddenProps);
         this.zkUrl = zkUrl;
@@ -40,23 +40,23 @@
         this.overriddenProps = overriddenProps;
     }
 
-    public String getSelectStatement() {
+    String getSelectStatement() {
         return selectStatement;
     }
 
-    public String getScn() {
+    String getScn() {
         return scn;
     }
 
-    public String getZkUrl() {
+    String getZkUrl() {
         return zkUrl;
     }
 
-    public String getTenantId() {
+    String getTenantId() {
         return tenantId;
     }
 
-    public Properties getOverriddenProps() {
+    Properties getOverriddenProps() {
         return overriddenProps;
     }
 }
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 8476509..18e304b 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -85,7 +85,7 @@
         }
         this.options = options;
         this.tableName = options.tableName().get();
-        this.zkUrl = options.get("zkUrl").get();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
         this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
         setSchema();
@@ -106,6 +106,11 @@
         }
     }
 
+    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
+            PhoenixInputSplit inputSplit) {
+        return new PhoenixInputPartition(readOptions, schema, inputSplit);
+    }
+
     @Override
     public StructType readSchema() {
         return schema;
@@ -175,19 +180,18 @@
 
                 // Get the region size
                 long regionSize = sizeCalculator.getRegionSize(
-                        location.getRegionInfo().getRegionName()
-                );
+                        location.getRegionInfo().getRegionName());
 
                 PhoenixDataSourceReadOptions phoenixDataSourceOptions =
                         new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
                                 tenantId.orElse(null), selectStatement, overriddenProps);
                 if (splitByStats) {
                     for (Scan aScan : scans) {
-                        partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                        partitions.add(getInputPartition(phoenixDataSourceOptions,
                                 new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)));
                     }
                 } else {
-                    partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                    partitions.add(getInputPartition(phoenixDataSourceOptions,
                             new PhoenixInputSplit(scans, regionSize, regionLocation)));
                 }
             }
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
index 624ff0f..e4adc07 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -26,16 +26,28 @@
 
 public class PhoenixInputPartition implements InputPartition<InternalRow> {
 
-    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
-    private StructType schema;
-    private PhoenixDataSourceReadOptions options;
+    private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private final StructType schema;
+    private final PhoenixDataSourceReadOptions options;
 
-    public PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
+    PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
         this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
         this.schema = schema;
         this.options = options;
     }
 
+    PhoenixDataSourceReadOptions getOptions() {
+        return options;
+    }
+
+    StructType getSchema() {
+        return schema;
+    }
+
+    SerializableWritable<PhoenixInputSplit> getPhoenixInputSplit() {
+        return phoenixInputSplit;
+    }
+
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
         return new PhoenixInputPartitionReader(options, schema, phoenixInputSplit);
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index 6f6413b..49f6c13 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -62,25 +62,30 @@
 
 public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow>  {
 
-    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
-    private StructType schema;
+    private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private final StructType schema;
+    private final PhoenixDataSourceReadOptions options;
     private Iterator<InternalRow> iterator;
     private PhoenixResultSet resultSet;
     private InternalRow currentRow;
-    private PhoenixDataSourceReadOptions options;
 
-    public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+    PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema,
+            SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
         this.options = options;
         this.phoenixInputSplit = phoenixInputSplit;
         this.schema = schema;
         initialize();
     }
 
+    Properties getOverriddenPropsFromOptions() {
+        return options.getOverriddenProps();
+    }
+
     private QueryPlan getQueryPlan() throws SQLException {
         String scn = options.getScn();
         String tenantId = options.getTenantId();
         String zkUrl = options.getZkUrl();
-        Properties overridingProps = options.getOverriddenProps();
+        Properties overridingProps = getOverriddenPropsFromOptions();
         if (scn != null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
         }
@@ -95,8 +100,7 @@
 
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes
-            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            return queryPlan;
+            return pstmt.optimizeQuery(selectStatement);
         }
     }
 
@@ -114,7 +118,8 @@
             ConnectionQueryServices services = queryPlan.getContext().getConnection().getQueryServices();
             services.clearTableRegionCache(tableNameBytes);
 
-            long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
+                    .getQueryServices().getRenewLeaseThresholdMilliSeconds();
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
@@ -131,13 +136,16 @@
                 peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
-            ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
+            ResultIterator iterator = queryPlan.useRoundRobinIterator() ?
+                    RoundRobinResultIterator.newIterator(iterators, queryPlan) :
+                    ConcatResultIterator.newIterator(iterators);
             if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
                 iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
             }
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+            this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(),
+                    queryPlan.getContext());
             this.iterator = SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new InputMetrics());
         }
         catch (SQLException e) {
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index 434f13c..c130db8 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -23,7 +23,7 @@
 import java.io.Serializable;
 import java.util.Properties;
 
-public class PhoenixDataSourceWriteOptions implements Serializable {
+class PhoenixDataSourceWriteOptions implements Serializable {
 
     private final String tableName;
     private final String zkUrl;
@@ -36,6 +36,9 @@
     private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn,
             String tenantId, StructType schema, boolean skipNormalizingIdentifier,
             Properties overriddenProps) {
+        Preconditions.checkNotNull(tableName);
+        Preconditions.checkNotNull(zkUrl);
+        Preconditions.checkNotNull(schema);
         Preconditions.checkNotNull(overriddenProps);
         this.tableName = tableName;
         this.zkUrl = zkUrl;
@@ -46,35 +49,35 @@
         this.overriddenProps = overriddenProps;
     }
 
-    public String getScn() {
+    String getScn() {
         return scn;
     }
 
-    public String getZkUrl() {
+    String getZkUrl() {
         return zkUrl;
     }
 
-    public String getTenantId() {
+    String getTenantId() {
         return tenantId;
     }
 
-    public StructType getSchema() {
+    StructType getSchema() {
         return schema;
     }
 
-    public String getTableName() {
+    String getTableName() {
         return tableName;
     }
 
-    public boolean skipNormalizingIdentifier() {
+    boolean skipNormalizingIdentifier() {
         return skipNormalizingIdentifier;
     }
 
-    public Properties getOverriddenProps() {
+    Properties getOverriddenProps() {
         return overriddenProps;
     }
 
-    public static class Builder {
+    static class Builder {
         private String tableName;
         private String zkUrl;
         private String scn;
@@ -83,42 +86,42 @@
         private boolean skipNormalizingIdentifier;
         private Properties overriddenProps = new Properties();
 
-        public Builder setTableName(String tableName) {
+        Builder setTableName(String tableName) {
             this.tableName = tableName;
             return this;
         }
 
-        public Builder setZkUrl(String zkUrl) {
+        Builder setZkUrl(String zkUrl) {
             this.zkUrl = zkUrl;
             return this;
         }
 
-        public Builder setScn(String scn) {
+        Builder setScn(String scn) {
             this.scn = scn;
             return this;
         }
 
-        public Builder setTenantId(String tenantId) {
+        Builder setTenantId(String tenantId) {
             this.tenantId = tenantId;
             return this;
         }
 
-        public Builder setSchema(StructType schema) {
+        Builder setSchema(StructType schema) {
             this.schema = schema;
             return this;
         }
 
-        public Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
+        Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
             this.skipNormalizingIdentifier = skipNormalizingIdentifier;
             return this;
         }
 
-        public Builder setOverriddenProps(Properties overriddenProps) {
+        Builder setOverriddenProps(Properties overriddenProps) {
             this.overriddenProps = overriddenProps;
             return this;
         }
 
-        public PhoenixDataSourceWriteOptions build() {
+        PhoenixDataSourceWriteOptions build() {
             return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema,
                     skipNormalizingIdentifier, overriddenProps);
         }
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 6793673..04670d5 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -26,6 +26,7 @@
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.log4j.Logger;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -40,16 +41,22 @@
 import org.apache.spark.sql.types.StructType;
 
 import com.google.common.collect.Lists;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixDataWriter implements DataWriter<InternalRow> {
 
+    private static final Logger logger = Logger.getLogger(PhoenixDataWriter.class);
     private final StructType schema;
     private final Connection conn;
     private final PreparedStatement statement;
+    private final long batchSize;
+    private long numRecords = 0;
 
-    public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+    PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
         String scn = options.getScn();
         String tenantId = options.getTenantId();
         String zkUrl = options.getZkUrl();
@@ -66,15 +73,21 @@
                     overridingProps);
             List<String> colNames = Lists.newArrayList(options.getSchema().names());
             if (!options.skipNormalizingIdentifier()){
-                colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+                colNames = colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList());
             }
             String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
             this.statement = this.conn.prepareStatement(upsertSql);
+            this.batchSize = Long.valueOf(overridingProps.getProperty(UPSERT_BATCH_SIZE,
+                    String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
     }
 
+    void commitBatchUpdates() throws SQLException {
+        conn.commit();
+    }
+
     @Override
     public void write(InternalRow internalRow) throws IOException {
         try {
@@ -90,14 +103,21 @@
                 }
                 ++i;
             }
+            numRecords++;
             statement.execute();
+            if (numRecords % batchSize == 0) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("commit called on a batch of size : " + batchSize);
+                }
+                commitBatchUpdates();
+            }
         } catch (SQLException e) {
             throw new IOException("Exception while executing Phoenix prepared statement", e);
         }
     }
 
     @Override
-    public WriterCommitMessage commit() throws IOException {
+    public WriterCommitMessage commit() {
         try {
             conn.commit();
         } catch (SQLException e) {
@@ -115,6 +135,6 @@
     }
 
     @Override
-    public void abort() throws IOException {
+    public void abort() {
     }
 }
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
index f7654e3..6fce8fe 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -25,10 +25,14 @@
 
     private final PhoenixDataSourceWriteOptions options;
 
-    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+    PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
         this.options = options;
     }
 
+    PhoenixDataSourceWriteOptions getOptions() {
+        return options;
+    }
+
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
         return new PhoenixDataWriter(options);
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
index 9d713b8..31a3065 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -17,17 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
 
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+
+public class PhoenixDataSourceWriter implements DataSourceWriter {
 
     private final PhoenixDataSourceWriteOptions options;
 
-    public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
+    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
+        if (!mode.equals(SaveMode.Overwrite)) {
+            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
+        }
+        if (!options.tableName().isPresent()) {
+            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+        }
+        if (!options.get(ZOOKEEPER_URL).isPresent()) {
+            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined");
+        }
+        this.options = createPhoenixDataSourceWriteOptions(options, schema);
     }
 
     @Override
@@ -47,4 +65,25 @@
     @Override
     public void abort(WriterCommitMessage[] messages) {
     }
+
+    PhoenixDataSourceWriteOptions getOptions() {
+        return options;
+    }
+
+    private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+                                                                              StructType schema) {
+        String scn = options.get(CURRENT_SCN_VALUE).orElse(null);
+        String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+        String zkUrl = options.get(ZOOKEEPER_URL).get();
+        boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+        return new PhoenixDataSourceWriteOptions.Builder()
+                .setTableName(options.tableName().get())
+                .setZkUrl(zkUrl)
+                .setScn(scn)
+                .setTenantId(tenantId)
+                .setSchema(schema)
+                .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
+                .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+                .build();
+    }
 }
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 1d6973c..a2ec2dc 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -23,7 +23,8 @@
 
 import org.apache.phoenix.util.{DateUtil, SchemaUtil}
 import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{And, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In,
+IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
 
 class FilterExpressionCompiler() {
 
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 34033b7..89d808d 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -16,16 +16,15 @@
 import java.sql.DriverManager
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.hadoop.hbase.HConstants
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.jdbc.PhoenixDriver
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.query.HBaseFactoryProvider
-import org.apache.spark._
+import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 import scala.collection.JavaConverters._
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 6d4c4cc..66c347e 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -15,7 +15,7 @@
 
 import java.sql.{PreparedStatement, ResultSet}
 import org.apache.hadoop.mapreduce.lib.db.DBWritable
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PDataType, PDate, PVarbinary, PhoenixArray}
 import org.apache.phoenix.util.ColumnInfo
 import org.joda.time.DateTime
 import scala.collection.{mutable, immutable}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 2f6ea8c..aacd460 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,7 +19,7 @@
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan, Filter}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
 
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index f69e988..363acf8 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -18,9 +18,18 @@
 package org.apache.phoenix.spark
 
 import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PBinaryArray, PBoolean, PBooleanArray, PChar,
+PCharArray, PDate, PDateArray, PDecimal, PDecimalArray, PDouble, PDoubleArray, PFloat, PFloatArray,
+PInteger, PIntegerArray, PLong, PLongArray, PSmallint, PSmallintArray, PTime, PTimeArray,
+PTimestamp, PTimestampArray, PTinyint, PTinyintArray, PUnsignedDate, PUnsignedDateArray,
+PUnsignedDouble, PUnsignedDoubleArray, PUnsignedFloat, PUnsignedFloatArray, PUnsignedInt,
+PUnsignedIntArray, PUnsignedLong, PUnsignedLongArray, PUnsignedSmallint, PUnsignedSmallintArray,
+PUnsignedTime, PUnsignedTimeArray, PUnsignedTimestamp, PUnsignedTimestampArray, PUnsignedTinyint,
+PUnsignedTinyintArray, PVarbinary, PVarbinaryArray, PVarchar, PVarcharArray}
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType,
+DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField,
+StructType, TimestampType}
 
 object SparkSchemaUtil {
 
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
index 01437f0..f89a451 100644
--- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution.datasources.jdbc
 
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType}
 
 private object PhoenixJdbcDialect  extends JdbcDialect {
 
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
index eac483a..50cdbf5 100644
--- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -27,10 +27,11 @@
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType,
+Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, ShortType, StringType,
+StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.NextIterator
 
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
new file mode 100644
index 0000000..6a2299e
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingDataSourceReader;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Optional;
+
+public class PhoenixTestingDataSource extends PhoenixDataSource {
+
+    public static final String TEST_SOURCE =
+            "org.apache.phoenix.spark.datasource.v2.PhoenixTestingDataSource";
+
+    // Override to return a test DataSourceReader
+    @Override
+    public DataSourceReader createReader(DataSourceOptions options) {
+        return new PhoenixTestingDataSourceReader(options);
+    }
+
+    // Override to return a test DataSourceWriter
+    @Override
+    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema,
+            SaveMode mode, DataSourceOptions options) {
+        return Optional.of(new PhoenixTestingDataSourceWriter(mode, schema, options));
+    }
+
+    @Override
+    public String shortName() {
+        return "phoenixTesting";
+    }
+
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
new file mode 100644
index 0000000..0116255
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+
+public class PhoenixTestingDataSourceReader extends PhoenixDataSourceReader {
+
+    public PhoenixTestingDataSourceReader(DataSourceOptions options) {
+        super(options);
+    }
+
+    // Override to return a test InputPartition
+    @Override
+    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
+            PhoenixInputSplit inputSplit) {
+        return new PhoenixTestingInputPartition(readOptions, readSchema(), inputSplit);
+    }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
new file mode 100644
index 0000000..eca7fc7
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixTestingInputPartition extends PhoenixInputPartition {
+
+    PhoenixTestingInputPartition(PhoenixDataSourceReadOptions options, StructType schema,
+            PhoenixInputSplit phoenixInputSplit) {
+        super(options, schema, phoenixInputSplit);
+    }
+
+    // Override to return a test InputPartitionReader for testing on the executor-side
+    @Override
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+        return new PhoenixTestingInputPartitionReader(getOptions(), getSchema(),
+                getPhoenixInputSplit());
+    }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
new file mode 100644
index 0000000..c85dc44
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Properties;
+
+public class PhoenixTestingInputPartitionReader extends PhoenixInputPartitionReader {
+
+    // A test property which is used to modify the current row returned by the test input
+    // partition reader in order to check properties passed from the driver to executors
+    public static final String RETURN_NULL_CURR_ROW = "return.null.curr.row";
+
+    PhoenixTestingInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema,
+            SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+        super(options, schema, phoenixInputSplit);
+    }
+
+    // Override to return null rather than the actual row based on a property passed to the executor
+    @Override
+    public InternalRow get() {
+        Properties props = getOverriddenPropsFromOptions();
+        return Boolean.valueOf(props.getProperty(RETURN_NULL_CURR_ROW)) ? null : super.get();
+    }
+
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
new file mode 100644
index 0000000..cbba62b
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixTestingDataSourceWriter extends PhoenixDataSourceWriter {
+
+    // Used to keep track of the total number of batches committed across all executors
+    public static int TOTAL_BATCHES_COMMITTED_COUNT = 0;
+
+    public PhoenixTestingDataSourceWriter(SaveMode mode, StructType schema,
+            DataSourceOptions options) {
+        super(mode, schema, options);
+    }
+
+    // Override to return a test DataWriterFactory
+    @Override
+    public DataWriterFactory<InternalRow> createWriterFactory() {
+        return new PhoenixTestingDataWriterFactory(getOptions());
+    }
+
+    // Override to sum up the total number of batches committed across all executors
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+        for (WriterCommitMessage message : messages) {
+            TOTAL_BATCHES_COMMITTED_COUNT += Integer.parseInt(message.toString());
+        }
+    }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
new file mode 100644
index 0000000..75aa447
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+import java.sql.SQLException;
+
+public class PhoenixTestingDataWriter extends PhoenixDataWriter {
+
+    private long numBatchesCommitted = 0;
+
+    PhoenixTestingDataWriter(PhoenixDataSourceWriteOptions options) {
+        super(options);
+    }
+
+    // Override to also count the number of times we call this method to test upsert batch commits
+    @Override
+    void commitBatchUpdates() throws SQLException {
+        super.commitBatchUpdates();
+        numBatchesCommitted++;
+    }
+
+    // Override to return a test WriterCommitMessage
+    @Override
+    public WriterCommitMessage commit() {
+        super.commit();
+        return new PhoenixTestingWriterCommitMessage(numBatchesCommitted);
+    }
+
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
new file mode 100644
index 0000000..4288128
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+
+public class PhoenixTestingDataWriterFactory extends PhoenixDataWriterFactory {
+
+    PhoenixTestingDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+        super(options);
+    }
+
+    // Override to return a test DataWriter
+    @Override
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+        return new PhoenixTestingDataWriter(getOptions());
+    }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
new file mode 100644
index 0000000..1e43e07
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+class PhoenixTestingWriterCommitMessage implements WriterCommitMessage {
+
+    private final long numBatchesCommitted;
+
+    PhoenixTestingWriterCommitMessage(long numBatchesCommitted) {
+        this.numBatchesCommitted = numBatchesCommitted;
+    }
+
+    // Override to keep track of the number of batches committed by the corresponding DataWriter
+    // in the WriterCommitMessage, so we can observe this value in the driver when we call
+    // {@link PhoenixTestingDataSourceWriter#commit(WriterCommitMessage[])}
+    @Override
+    public String toString() {
+        return String.valueOf(this.numBatchesCommitted);
+    }
+
+}