PHOENIX-5059 Use the Datasource v2 api in the spark connector
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
index 3051cd6..ef127ac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
@@ -194,7 +194,7 @@
                 .setSelectColumns(
                         Lists.newArrayList("A_INTEGER", "A_STRING", "A_ID", "B_STRING", "B_INTEGER"))
                 .setFullTableName(tableName)
-                .setWhereClause("a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'");
+                .setWhereClause("A_INTEGER = 1 AND A_STRING >= 'ab' AND A_STRING < 'de' AND A_ID = '123'");
             rs = executeQuery(conn, queryBuilder);
             assertTrue(rs.next());
             assertEquals(1, rs.getInt(1));
@@ -205,7 +205,7 @@
             assertFalse(rs.next());
 
             // all single slots with one value.
-            queryBuilder.setWhereClause("a_integer = 1 AND a_string = 'ab' AND a_id = '123'");
+            queryBuilder.setWhereClause("A_INTEGER = 1 AND A_STRING = 'ab' AND A_ID = '123'");
             rs = executeQuery(conn, queryBuilder);
             assertTrue(rs.next());
             assertEquals(1, rs.getInt(1));
@@ -216,7 +216,7 @@
             assertFalse(rs.next());
 
             // all single slots with multiple values.
-            queryBuilder.setWhereClause("a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'");
+            queryBuilder.setWhereClause("A_INTEGER in (2, 4) AND A_STRING = 'abc' AND A_ID = '123'");
             rs = executeQuery(conn, queryBuilder);
 
             assertTrue(rs.next());
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index e2790bd..9cc3c3d 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -487,6 +487,14 @@
     <testSourceDirectory>src/it/scala</testSourceDirectory>
     <testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
     <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+                <source>1.8</source>
+                <target>1.8</target>
+            </configuration>
+        </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 83578ba..1257c43 100644
--- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -16,12 +16,13 @@
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseOrderByIT;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryBuilder;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
-import org.junit.Ignore;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -31,28 +32,6 @@
 
 public class OrderByIT extends BaseOrderByIT {
 
-    @Ignore(" || operator not supported in order by Spark 1.6 ")
-    @Test
-    @Override
-    public void testDescMultiOrderByExpr() throws Exception {
-        super.testDescMultiOrderByExpr();
-    }
-
-    @Ignore("NULLS FIRST|LAST not supported in Spark 1.6")
-    @Test
-    @Override
-    public void testNullsLastWithDesc() throws Exception {
-        super.testNullsLastWithDesc();
-    }
-
-    @Ignore("NULLS FIRST|LAST not supported in Spark 1.6")
-    @Test
-    @Override
-    public void testOrderByReverseOptimizationWithNullsLast() throws Exception {
-        super.testOrderByReverseOptimizationWithNullsLast();
-    }
-
-
     @Override
     protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
                                                     String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
@@ -128,25 +107,20 @@
 
             // create two PhoenixRDDs  using the table names and columns that are required for the JOIN query
             List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            DataFrame phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
-                            JavaConverters.asScalaBufferConverter(table1Columns).asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName1);
-            List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
-            phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
-                            JavaConverters.asScalaBufferConverter(table2Columns).asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName2);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName1)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName1);
+            phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName2)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName2);
 
             String query =
                     "SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2
                             + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T1.`CF1.B`";
-            DataFrame dataset =
+            Dataset<Row> dataset =
                     sqlContext.sql(query);
             List<Row> rows = dataset.collectAsList();
             ResultSet rs = new SparkResultSet(rows, dataset.columns());
@@ -175,8 +149,8 @@
             assertFalse(rs.next());
 
             query =
-                    "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2
-                            + " t2 on t1.a_string = t2.a_string order by t2.col1";
+                    "SELECT T1.A_STRING, T2.COL1 FROM " + tableName1 + " T1 JOIN " + tableName2
+                            + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T2.COL1";
             dataset =  sqlContext.sql(query);
             rows = dataset.collectAsList();
             rs = new SparkResultSet(rows, dataset.columns());
@@ -193,7 +167,6 @@
         }
     }
 
-    @Ignore("Not passing on CDH 4.15")
     @Test
     public void testOrderByWithUnionAll() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -249,26 +222,20 @@
             conn.commit();
 
 
-            List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            DataFrame phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
-                            JavaConverters.asScalaBufferConverter(table1Columns).asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName1);
-            List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
-            phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
-                            JavaConverters.asScalaBufferConverter(table2Columns).asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName2);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName1)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName1);
+            phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName2)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName2);
 
             String query =
                     "select a_string, `cf2.d` from " + tableName1 + " union all select * from "
                             + tableName2 + " order by `cf2.d`";
-            DataFrame dataset =
+            Dataset<Row> dataset =
                     sqlContext.sql(query);
             List<Row> rows = dataset.collectAsList();
             ResultSet rs = new SparkResultSet(rows, dataset.columns());
@@ -330,15 +297,12 @@
             stmt.execute();
             conn.commit();
 
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            DataFrame phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
-                            JavaConverters.asScalaBufferConverter(Lists.newArrayList("col1", "col2", "col4")).asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-
-            phoenixDataSet.registerTempTable(tableName);
-            DataFrame dataset =
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName);
+            Dataset<Row> dataset =
                     sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName
                             + " ORDER BY col1+col2, col4");
             List<Row> rows = dataset.collectAsList();
@@ -395,19 +359,12 @@
             conn.commit();
 
 
-            List<String> columns =
-                    Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D",
-                        "COL2");
-
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            DataFrame phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
-                            JavaConverters.asScalaBufferConverter(columns).asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(url), config, false, null)
-                                    .toDataFrame(sqlContext);
-
-            phoenixDataSet.registerTempTable(tableName);
-            DataFrame dataset =
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName);
+            Dataset<Row> dataset =
                     sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
                             + tableName + " ORDER BY `CF1.A`,`CF2.C`");
             List<Row> rows = dataset.collectAsList();
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
index db2fe1a..668c3c8 100644
--- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -22,13 +22,14 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.QueryBuilder;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import scala.Option;
 import scala.collection.JavaConverters;
 
@@ -42,33 +43,15 @@
     public static final String APP_NAME = "Java Spark Tests";
     public static final String NUM_EXECUTORS = "local[2]";
     public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress";
-    public static final String CASE_SENSITIVE_COLUMNS = "spark.sql.caseSensitive";
 
-    private static SparkContext sparkContext = null;
-    private static SQLContext sqlContext = null;
-
-    public static SparkContext getSparkContext() {
-        if (sparkContext == null) {
-            SparkConf conf = new SparkConf(true);
-            conf.setAppName(APP_NAME);
-            conf.setMaster(NUM_EXECUTORS);
-            conf.set(UI_SHOW_CONSOLE_PROGRESS, "false");
-            conf.set(CASE_SENSITIVE_COLUMNS, "false");
-            sparkContext = new SparkContext(conf);
-        }
-        return sparkContext;
-    }
-
-    public static SQLContext getSqlContext() {
-        if (sqlContext == null) {
-            sqlContext = new SQLContext(getSparkContext());
-        }
-        return sqlContext;
+    public static SparkSession getSparkSession() {
+        return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
+                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate();
     }
 
     public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
             throws SQLException {
-        SQLContext sqlContext = SparkUtil.getSqlContext();
+        SQLContext sqlContext = getSparkSession().sqlContext();
 
         boolean forceRowKeyOrder =
                 conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
@@ -82,14 +65,12 @@
 
         // create PhoenixRDD using the table name and columns that are required by the query
         // since we don't set the predicate filtering is done after rows are returned from spark
-        DataFrame phoenixDataSet =
-                new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(),
-                        JavaConverters.asScalaBufferConverter(queryBuilder.getRequiredColumns()).asScala().toSeq(),
-                        Option.apply((String) null), Option.apply(url), config, false,
-                        null).toDataFrame(sqlContext);
+        Dataset phoenixDataSet = getSparkSession().read().format("phoenix")
+                .option(DataSourceOptions.TABLE_KEY, queryBuilder.getFullTableName())
+                .option(PhoenixDataSource.ZOOKEEPER_URL, url).load();
 
-        phoenixDataSet.registerTempTable(queryBuilder.getFullTableName());
-        DataFrame dataset = sqlContext.sql(queryBuilder.build());
+        phoenixDataSet.createOrReplaceTempView(queryBuilder.getFullTableName());
+        Dataset<Row> dataset = sqlContext.sql(queryBuilder.build());
         SparkPlan plan = dataset.queryExecution().executedPlan();
         List<Row> rows = dataset.collectAsList();
         queryBuilder.setOrderByClause(prevOrderBy);
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index 7ac0039..efdb8cb 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -26,9 +26,9 @@
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
-CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
-UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
-UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE "table4" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table4" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table4" ("id", "col1") VALUES (2, 'bar')
 CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
 UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
 CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index ca3470f..a9c2070 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -19,6 +19,7 @@
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
 import org.apache.phoenix.query.BaseTest
 import org.apache.phoenix.util.PhoenixRuntime
+import org.apache.spark.sql.{SQLContext, SparkSession}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite, Matchers}
 
@@ -50,7 +51,7 @@
   final val TenantId = "theTenant"
 
   var conn: Connection = _
-  var sc: SparkContext = _
+  var spark: SparkSession = _
 
   lazy val hbaseConfiguration = {
     val conf = PhoenixSparkITHelper.getTestClusterConfig
@@ -99,12 +100,17 @@
       .setMaster("local[2]") // 2 threads, some parallelism
       .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
 
-    sc = new SparkContext(conf)
+    spark = SparkSession
+      .builder()
+      .appName("PhoenixSparkIT")
+      .master("local[2]") // 2 threads, some parallelism
+      .config("spark.ui.showConsoleProgress", "false")
+      .getOrCreate()
   }
 
   override def afterAll() {
     conn.close()
-    sc.stop()
+    spark.stop()
     PhoenixSparkITHelper.cleanUpAfterTest()
     PhoenixSparkITHelper.doTeardown
   }
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index fb4bb64..6bef721 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,16 +13,16 @@
  */
 package org.apache.phoenix.spark
 
+import java.sql.DriverManager
 import java.util.Date
 
 import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext, SaveMode}
-import org.joda.time.DateTime
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Row, SaveMode}
+
 import scala.collection.mutable.ListBuffer
-import org.apache.hadoop.conf.Configuration
 
 /**
   * Note: If running directly from an IDE, these are the recommended VM parameters:
@@ -30,13 +30,20 @@
   */
 class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
-  test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.createDataFrame(
+  test("Can persist data with case sensitive columns (like in avro schema)") {
+    val df = spark.createDataFrame(
       Seq(
         (1, 1, "test_child_1"),
-        (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1")
-    df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true)
+        (2, 1, "test_child_2"))).
+    // column names are case sensitive
+      toDF("ID", "TABLE3_ID", "t2col1")
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "TABLE3",
+        PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
+      .mode(SaveMode.Overwrite)
+      .save()
+
 
     // Verify results
     val stmt = conn.createStatement()
@@ -50,37 +57,55 @@
     stmt.close()
 
     results.toList shouldEqual checkResults
-
   }
-  
+
+  // INSERT is not support using DataSource v2 api yet
+  ignore("Can use write data using spark SQL INSERT") {
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE3", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+    df1.createOrReplaceTempView("TABLE3")
+
+    // Insert data
+    spark.sql("INSERT INTO TABLE3 VALUES(10, 10, 10)")
+    spark.sql("INSERT INTO TABLE3 VALUES(20, 20, 20)")
+
+    // Verify results
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE3 WHERE ID>=10")
+    val expectedResults = List((10, 10, "10"), (20, 20, "20"))
+    val results = ListBuffer[(Long, Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual expectedResults
+  }
+
   test("Can convert Phoenix schema") {
     val phoenixSchema = List(
       new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
     )
 
-    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
-      conf = hbaseConfiguration)
+    val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
 
-    val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
-
-    val expected = List(StructField("varcharColumn", StringType, nullable = true))
+    val expected = new StructType(List(StructField("varcharColumn", StringType, nullable = true)).toArray)
 
     catalystSchema shouldEqual expected
   }
 
   test("Can create schema RDD and execute query") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("sql_table_1")
 
-    df1.registerTempTable("sql_table_1")
+    val df2 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
-      conf = hbaseConfiguration)
+    df2.createOrReplaceTempView("sql_table_2")
 
-    df2.registerTempTable("sql_table_2")
-
-    val sqlRdd = sqlContext.sql(
+    val sqlRdd = spark.sql(
       """
         |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
         |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
@@ -91,18 +116,49 @@
     count shouldEqual 6L
   }
 
+  ignore("Ordering by pk columns should not require sorting") {
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+    df1.createOrReplaceTempView("TABLE1")
+
+    val sqlRdd = spark.sql("SELECT * FROM TABLE1 ORDER BY ID, COL1")
+    val plan = sqlRdd.queryExecution.sparkPlan
+    // verify the spark plan doesn't have a sort
+    assert(!plan.toString.contains("Sort"))
+
+    val expectedResults = Array(Row.fromSeq(Seq(1, "test_row_1")), Row.fromSeq(Seq(2, "test_row_2")))
+    val actual = sqlRdd.collect()
+
+    actual shouldEqual expectedResults
+  }
+
+  test("Verify correct number of partitions are created") {
+    val conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
+    val ddl = "CREATE TABLE SPLIT_TABLE (id VARCHAR NOT NULL PRIMARY KEY, val VARCHAR) split on ('e','j','o')"
+    conn.createStatement.execute(ddl)
+    val keys = Array("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s",
+      "t", "u", "v", "w", "x", "y", "z")
+    for (key <- keys) {
+      conn.createStatement.execute("UPSERT INTO SPLIT_TABLE VALUES('" + key + "', '" + key + "')")
+    }
+    conn.commit()
+
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "SPLIT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+    df1.createOrReplaceTempView("SPLIT_TABLE")
+    val sqlRdd = spark.sql("SELECT * FROM SPLIT_TABLE")
+    val numPartitions = sqlRdd.rdd.partitions.size
+
+    numPartitions shouldEqual 4
+  }
+
   test("Can create schema RDD and execute query on case sensitive table (no config)") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> SchemaUtil.getEscapedArgument("table4"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
+    df1.createOrReplaceTempView("table4")
 
-    val df1 = sqlContext.phoenixTableAsDataFrame(
-      SchemaUtil.getEscapedArgument("table3"),
-      Array("id", "col1"),
-      zkUrl = Some(quorumAddress))
-
-    df1.registerTempTable("table3")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+    val sqlRdd = spark.sql("SELECT id FROM table4")
 
     val count = sqlRdd.count()
 
@@ -110,20 +166,17 @@
   }
 
   test("Can create schema RDD and execute constrained query") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"),
-      conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("sql_table_1")
 
-    df1.registerTempTable("sql_table_1")
+    val df2 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = 1")
 
-    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
-      predicate = Some("\"ID\" = 1"),
-      conf = hbaseConfiguration)
+    df2.createOrReplaceTempView("sql_table_2")
 
-    df2.registerTempTable("sql_table_2")
-
-    val sqlRdd = sqlContext.sql(
+    val sqlRdd = spark.sql(
       """
         |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
         |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
@@ -135,17 +188,12 @@
   }
 
   test("Can create schema RDD with predicate that will never match") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = -1")
 
-    val df1 = sqlContext.phoenixTableAsDataFrame(
-      SchemaUtil.getEscapedArgument("table3"),
-      Array("id", "col1"),
-      predicate = Some("\"id\" = -1"),
-      conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("table3")
 
-    df1.registerTempTable("table3")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+    val sqlRdd = spark.sql("SELECT * FROM table3")
 
     val count = sqlRdd.count()
 
@@ -153,21 +201,17 @@
   }
 
   test("Can create schema RDD with complex predicate") {
-    val sqlContext = new SQLContext(sc)
+    val predicate = "ID > 0 AND TIMESERIES_KEY BETWEEN " +
+      "CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND " +
+      "CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "DATE_PREDICATE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
+      .filter(predicate)
 
-    val df1 = sqlContext.phoenixTableAsDataFrame(
-      "DATE_PREDICATE_TEST_TABLE",
-      Array("ID", "TIMESERIES_KEY"),
-      predicate = Some(
-        """
-          |ID > 0 AND TIMESERIES_KEY BETWEEN
-          |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
-          |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
-      conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("date_predicate_test_table")
 
-    df1.registerTempTable("date_predicate_test_table")
-
-    val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
+    val sqlRdd = spark.sqlContext.sql("SELECT * FROM date_predicate_test_table")
 
     val count = sqlRdd.count()
 
@@ -175,14 +219,12 @@
   }
 
   test("Can query an array table") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
-      conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("ARRAY_TEST_TABLE")
 
-    df1.registerTempTable("ARRAY_TEST_TABLE")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
+    val sqlRdd = spark.sql("SELECT * FROM ARRAY_TEST_TABLE")
 
     val count = sqlRdd.count()
 
@@ -195,12 +237,12 @@
   }
 
   test("Can read a table as an RDD") {
-    val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
-      conf = hbaseConfiguration)
+    val rdd1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
     val count = rdd1.count()
 
-    val arrayValues = rdd1.take(1)(0)("VCARRAY")
+    val arrayValues = rdd1.take(1)(0)(1)
 
     arrayValues should equal(Array("String1", "String2", "String3"))
 
@@ -208,24 +250,30 @@
   }
 
   test("Can save to phoenix table") {
-    val sqlContext = new SQLContext(sc)
+    val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
 
-    val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("COL1", StringType),
+        StructField("COL2", IntegerType)))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "OUTPUT_TEST_TABLE",
-        Seq("ID", "COL1", "COL2"),
-        hbaseConfiguration
-      )
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
     val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
-    val results = ListBuffer[(Long, String, Int)]()
+    val results = ListBuffer[Row]()
     while (rs.next()) {
-      results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
+      results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
     }
 
     // Verify they match
@@ -234,18 +282,29 @@
     }
   }
 
-  test("Can save Java and Joda dates to Phoenix (no config)") {
-    val dt = new DateTime()
-    val date = new Date()
+  test("Can save dates to Phoenix using java.sql.Date") {
+    val date = java.sql.Date.valueOf("2016-09-30")
 
-    val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "OUTPUT_TEST_TABLE",
-        Seq("ID", "COL1", "COL2", "COL3"),
-        zkUrl = Some(quorumAddress)
-      )
+    // Since we are creating a Row we have to use java.sql.date
+    // java.util.date or joda.DateTime is not supported
+    val dataSet = Seq(Row(1L, "1", 1, date), Row(2L, "2", 2, date))
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("COL1", StringType),
+        StructField("COL2", IntegerType),
+        StructField("COL3", DateType)))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -256,94 +315,56 @@
     }
 
     // Verify the epochs are equal
-    results(0).getTime shouldEqual dt.getMillis
+    results(0).getTime shouldEqual date.getTime
     results(1).getTime shouldEqual date.getTime
   }
 
   test("Can infer schema without defining columns") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration)
+    val df = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     df.schema("ID").dataType shouldEqual LongType
     df.schema("TABLE1_ID").dataType shouldEqual LongType
     df.schema("t2col1").dataType shouldEqual StringType
   }
 
   test("Spark SQL can use Phoenix as a data source with no schema specified") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
     df.count() shouldEqual 2
     df.schema("ID").dataType shouldEqual LongType
     df.schema("COL1").dataType shouldEqual StringType
   }
 
-  ignore("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+  test("Datasource v2 pushes down filters") {
+    val df = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
     val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID"))
 
     // Make sure we got the right value back
     assert(res.first().getLong(0) == 1L)
 
     val plan = res.queryExecution.sparkPlan
-    // filters should be pushed into phoenix relation
-    assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " +
-      "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]"))
-    // spark should run the filters on the rows returned by Phoenix
-    assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) "
-      + " && (COL1.* = test_row_1)) && (ID.* = 1)).*"))
+    // filters should be pushed into scan
+    assert(".*ScanV2 phoenix.*Filters.*ID.*COL1.*".r.findFirstIn(plan.toString).isDefined)
+    // spark should not do post scan filtering
+    assert(".*Filter .*ID.*COL1.*".r.findFirstIn(plan.toString).isEmpty)
   }
 
-  test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {
+  test("Can persist a dataframe") {
     // Load from TABLE1
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix").options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
     // Save to TABLE1_COPY
-    df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress))
-
-    // Verify results
-    val stmt = conn.createStatement()
-    val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
-
-    val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
-    val results = ListBuffer[(Long, String)]()
-    while (rs.next()) {
-      results.append((rs.getLong(1), rs.getString(2)))
-    }
-    stmt.close()
-
-    results.toList shouldEqual checkResults
-  }
-
-  test("Can persist a dataframe using 'DataFrame.save()") {
-    // Clear TABLE1_COPY
-    var stmt = conn.createStatement()
-    stmt.executeUpdate("DELETE FROM TABLE1_COPY")
-    stmt.close()
-
-    // Load TABLE1, save as TABLE1_COPY
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext
-      .read
-      .format("org.apache.phoenix.spark")
-      .option("table", "TABLE1")
-      .option("zkUrl", quorumAddress)
-      .load()
-
-    // Save to TABLE21_COPY
     df
       .write
-      .format("org.apache.phoenix.spark")
+      .format("phoenix")
       .mode(SaveMode.Overwrite)
       .option("table", "TABLE1_COPY")
-      .option("zkUrl", quorumAddress)
+      .option(PhoenixDataSource.ZOOKEEPER_URL, quorumAddress)
       .save()
 
     // Verify results
-    stmt = conn.createStatement()
+    val stmt = conn.createStatement()
     val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
 
     val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
@@ -357,15 +378,22 @@
   }
 
   test("Can save arrays back to phoenix") {
-    val dataSet = List((2L, Array("String1", "String2", "String3")))
+    val dataSet = List(Row(2L, Array("String1", "String2", "String3")))
+    val schema = StructType(Seq(
+      StructField("ID", LongType, nullable = false),
+      StructField("VCARRAY", ArrayType(StringType, true))
+    ))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "ARRAY_TEST_TABLE",
-        Seq("ID", "VCARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -374,57 +402,54 @@
     val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]]
 
     // Verify the arrays are equal
-    sqlArray shouldEqual dataSet(0)._2
+    sqlArray shouldEqual dataSet(0).get(1)
   }
 
   test("Can read from table with schema and escaped table name") {
     // Manually escape
-    val rdd1 = sc.phoenixTableAsRDD(
-      "CUSTOM_ENTITY.\"z02\"",
-      Seq("ID"),
-      conf = hbaseConfiguration)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "CUSTOM_ENTITY.\"z02\"", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
 
-    var count = rdd1.count()
+    var count = df1.count()
 
     count shouldEqual 1L
 
     // Use SchemaUtil
-    val rdd2 = sc.phoenixTableAsRDD(
-      SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"),
-      Seq("ID"),
-      conf = hbaseConfiguration)
+    val df2 = spark.sqlContext.read.format("phoenix")
+      .options(
+        Map("table" -> SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load()
 
-    count = rdd2.count()
+    count = df2.count()
 
     count shouldEqual 1L
-
   }
 
   test("Ensure DataFrame field normalization (PHOENIX-2196)") {
-    val rdd1 = sc
+    val rdd1 = spark.sparkContext
       .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
       .map(p => Row(p._1, p._2, p._3))
 
-    val sqlContext = new SQLContext(sc)
-
     val schema = StructType(Seq(
       StructField("id", LongType, nullable = false),
       StructField("table1_id", LongType, nullable = true),
       StructField("\"t2col1\"", StringType, nullable = true)
     ))
 
-    val df = sqlContext.createDataFrame(rdd1, schema)
+    val df = spark.sqlContext.createDataFrame(rdd1, schema)
 
-    df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
   }
 
   test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
-
+    val df = spark.sqlContext.read.format("phoenix").options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     // Prefix match
     val res1 = df.filter("COL1 like 'test_row_%'")
+    val plan = res1.groupBy().count().queryExecution.sparkPlan
     res1.count() shouldEqual 2
 
     // Suffix match
@@ -463,14 +488,14 @@
   }
 
   test("Can load decimal types with accurate precision and scale (PHOENIX-2288)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_DECIMAL", "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "TEST_DECIMAL", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     assert(df.select("COL1").first().getDecimal(0) == BigDecimal("123.456789").bigDecimal)
   }
 
-  test("Can load small and tiny integeger types (PHOENIX-2426)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_SMALL_TINY", "zkUrl" -> quorumAddress))
+  test("Can load small and tiny integer types (PHOENIX-2426)") {
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "TEST_SMALL_TINY", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     assert(df.select("COL1").first().getShort(0).toInt == 32767)
     assert(df.select("COL2").first().getByte(0).toInt == 127)
   }
@@ -478,21 +503,19 @@
   test("Can save arrays from custom dataframes back to phoenix") {
     val dataSet = List(Row(2L, Array("String1", "String2", "String3"), Array(1, 2, 3)))
 
-    val sqlContext = new SQLContext(sc)
-
     val schema = StructType(
       Seq(StructField("ID", LongType, nullable = false),
         StructField("VCARRAY", ArrayType(StringType)),
         StructField("INTARRAY", ArrayType(IntegerType))))
 
-    val rowRDD = sc.parallelize(dataSet)
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
 
     // Apply the schema to the RDD.
-    val df = sqlContext.createDataFrame(rowRDD, schema)
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
     df.write
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", "zkUrl" -> quorumAddress))
+      .format("phoenix")
+      .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .mode(SaveMode.Overwrite)
       .save()
 
@@ -509,15 +532,23 @@
   }
 
   test("Can save arrays of AnyVal type back to phoenix") {
-    val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
+    val dataSet = List(Row(2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "ARRAY_ANYVAL_TEST_TABLE",
-        Seq("ID", "INTARRAY", "BIGINTARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("INTARRAY", ArrayType(IntegerType)),
+        StructField("BIGINTARRAY", ArrayType(LongType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_ANYVAL_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -527,20 +558,27 @@
     val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]]
 
     // Verify the arrays are equal
-    intArray shouldEqual dataSet(0)._2
-    longArray shouldEqual dataSet(0)._3
+    intArray shouldEqual dataSet(0).get(1)
+    longArray shouldEqual dataSet(0).get(2)
   }
 
   test("Can save arrays of Byte type back to phoenix") {
-    val dataSet = List((2L, Array(1.toByte, 2.toByte, 3.toByte)))
+    val dataSet = List(Row(2L, Array(1.toByte, 2.toByte, 3.toByte)))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "ARRAY_BYTE_TEST_TABLE",
-        Seq("ID", "BYTEARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("BYTEARRAY", ArrayType(ByteType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_BYTE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -549,19 +587,28 @@
     val byteArray = rs.getArray(1).getArray().asInstanceOf[Array[Byte]]
 
     // Verify the arrays are equal
-    byteArray shouldEqual dataSet(0)._2
+    byteArray shouldEqual dataSet(0).get(1)
   }
 
   test("Can save binary types back to phoenix") {
-    val dataSet = List((2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+    val dataSet = List(Row(2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "VARBINARY_TEST_TABLE",
-        Seq("ID", "BIN", "VARBIN", "BINARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    val schema = StructType(
+      Seq(StructField("ID", LongType, false),
+        StructField("BIN", BinaryType),
+        StructField("VARBIN", BinaryType),
+        StructField("BINARRAY", ArrayType(BinaryType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "VARBINARY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -572,16 +619,15 @@
     val varByteArray = rs.getArray("BINARRAY").getArray().asInstanceOf[Array[Array[Byte]]]
 
     // Verify the arrays are equal
-    byte shouldEqual dataSet(0)._2
-    varByte shouldEqual dataSet(0)._3
-    varByteArray shouldEqual dataSet(0)._4
+    byte shouldEqual dataSet(0).get(1)
+    varByte shouldEqual dataSet(0).get(2)
+    varByteArray shouldEqual dataSet(0).get(3)
   }
 
   test("Can load Phoenix DATE columns through DataFrame API") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.read
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .load
     val dt = df.select("COL1").first().getDate(0).getTime
     val epoch = new Date().getTime
@@ -595,37 +641,37 @@
   }
 
   test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"),
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> SchemaUtil.getEscapedArgument("space"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
     val res = df.filter(df.col("first name").equalTo("xyz"))
     // Make sure we got the right value back
     assert(res.collectAsList().size() == 1L)
   }
 
   test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
-      "zkUrl" -> quorumAddress))
-    df.registerTempTable("temp")
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
+    df.createOrReplaceTempView("temp")
 
     // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
     // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
 
-    val res1 = sqlContext.sql("select * from temp where salary = '10000' ")
+    val res1 = spark.sql("select * from temp where salary = '10000' ")
     assert(res1.collectAsList().size() == 1L)
 
-    val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ")
+    val res2 = spark.sql("select * from temp where \"salary\" = '10000' ")
     assert(res2.collectAsList().size() == 0L)
 
-    val res3 = sqlContext.sql("select * from temp where salary > '10000' ")
+    val res3 = spark.sql("select * from temp where salary > '10000' ")
     assert(res3.collectAsList().size() == 2L)
   }
 
   test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
 
     // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
     // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
@@ -644,10 +690,9 @@
   }
 
   test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.read
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress, "dateAsTimestamp" -> "true"))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, "dateAsTimestamp" -> "true"))
       .load
     val dtRes = df.select("COL1").first()
     val ts = dtRes.getTimestamp(0).getTime
@@ -657,10 +702,9 @@
   }
 
   test("Can load Phoenix Time columns through DataFrame API") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.read
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "TIME_TEST", "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "TIME_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .load
     val time = df.select("COL1").first().getTimestamp(0).getTime
     val epoch = new Date().getTime
@@ -668,13 +712,14 @@
   }
 
   test("can read all Phoenix data types") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "GIGANTIC_TABLE",
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
 
     df.write
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", "zkUrl" -> quorumAddress))
+      .format("phoenix")
+      .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .mode(SaveMode.Overwrite)
       .save()
 
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
index 77b41af..291ea2a 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
@@ -64,8 +64,7 @@
   /*****************/
 
   test("Can read from tenant-specific table as DataFrame") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.phoenixTableAsDataFrame(
+    val df = spark.sqlContext.phoenixTableAsDataFrame(
       TenantTable,
       Seq(OrgIdCol, TenantOnlyCol),
       zkUrl = Some(quorumAddress),
@@ -78,7 +77,7 @@
   }
 
   test("Can read from tenant-specific table as RDD") {
-    val rdd = sc.phoenixTableAsRDD(
+    val rdd = spark.sparkContext.phoenixTableAsRDD(
       TenantTable,
       Seq(OrgIdCol, TenantOnlyCol),
       zkUrl = Some(quorumAddress),
@@ -95,23 +94,23 @@
   /*****************/
 
   test("Can write a DataFrame using 'DataFrame.saveToPhoenix' to tenant-specific view") {
-    val sqlContext = new SQLContext(sc)
+    val sqlContext = spark.sqlContext
     import sqlContext.implicits._
 
-    val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
+    val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
     df.saveToPhoenix(TenantTable, zkUrl = Some(quorumAddress), tenantId = Some(TenantId))
 
     verifyResults
   }
 
   test("Can write a DataFrame using 'DataFrame.write' to tenant-specific view") {
-    val sqlContext = new SQLContext(sc)
+    val sqlContext = spark.sqlContext
     import sqlContext.implicits._
 
-    val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
+    val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
 
     df.write
-      .format("org.apache.phoenix.spark")
+      .format("phoenix")
       .mode("overwrite")
       .option("table", TenantTable)
       .option(PhoenixRuntime.TENANT_ID_ATTRIB, TenantId)
@@ -122,8 +121,7 @@
   }
 
   test("Can write an RDD to Phoenix tenant-specific view") {
-    val sqlContext = new SQLContext(sc)
-    sc
+    spark.sparkContext
       .parallelize(TestDataSet)
       .saveToPhoenix(
         TenantTable,
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
new file mode 100644
index 0000000..ad79d1c
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import java.util.Optional;
+
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Implements the DataSourceV2 api to read and write from Phoenix tables
+ */
+public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, WriteSupport, DataSourceRegister {
+
+    public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
+    public static final String ZOOKEEPER_URL = "zkUrl";
+
+    @Override
+    public DataSourceReader createReader(DataSourceOptions options) {
+        return new PhoenixDataSourceReader(options);
+    }
+
+    @Override
+    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode,
+            DataSourceOptions options) {
+        if (!mode.equals(SaveMode.Overwrite)) {
+            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
+        }
+        if (!options.tableName().isPresent()) {
+            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+        }
+        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
+            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
+        }
+
+        PhoenixDataSourceWriteOptions writeOptions = createPhoenixDataSourceWriteOptions(options, schema);
+        return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+    }
+
+    private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+            StructType schema) {
+        String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
+        String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+        String zkUrl = options.get(ZOOKEEPER_URL).get();
+        boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+        return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
+                .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
+                .setSkipNormalizingIdentifier(skipNormalizingIdentifier).build();
+    }
+
+    @Override
+    public String shortName() {
+        return "phoenix";
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
new file mode 100644
index 0000000..8c2fdb1
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import java.io.Serializable;
+
+public class PhoenixDataSourceReadOptions implements Serializable {
+
+    private final String tenantId;
+    private final String zkUrl;
+    private final String scn;
+    private final String selectStatement;
+
+    public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, String selectStatement) {
+        this.zkUrl = zkUrl;
+        this.scn = scn;
+        this.tenantId = tenantId;
+        this.selectStatement = selectStatement;
+    }
+
+    public String getSelectStatement() {
+        return selectStatement;
+    }
+
+    public String getScn() {
+        return scn;
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
new file mode 100644
index 0000000..446d96f
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.spark.FilterExpressionCompiler;
+import org.apache.phoenix.spark.SparkSchemaUtil;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple3;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
+        SupportsPushDownRequiredColumns {
+
+    private final DataSourceOptions options;
+    private final String tableName;
+    private final String zkUrl;
+    private final boolean dateAsTimestamp;
+
+    private StructType schema;
+    private Filter[] pushedFilters = new Filter[]{};
+    // derived from pushedFilters
+    private String whereClause;
+
+    public PhoenixDataSourceReader(DataSourceOptions options) {
+        if (!options.tableName().isPresent()) {
+            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+        }
+        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
+            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
+        }
+        this.options = options;
+        this.tableName = options.tableName().get();
+        this.zkUrl = options.get("zkUrl").get();
+        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        setSchema();
+    }
+
+    /**
+     * Sets the schema using all the table columns before any column pruning has been done
+     */
+    private void setSchema() {
+        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl)) {
+            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
+            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
+            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+        }
+        catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public StructType readSchema() {
+        return schema;
+    }
+
+    @Override
+    public Filter[] pushFilters(Filter[] filters) {
+        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
+        whereClause = tuple3._1();
+        pushedFilters = tuple3._3();
+        return tuple3._2();
+    }
+
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        boolean splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        Properties overridingProps = new Properties();
+        if(currentScnValue.isPresent()) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
+        }
+        if (tenantId.isPresent()){
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
+        }
+        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, Lists.newArrayList(schema.names()));
+            final Statement statement = conn.createStatement();
+            final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
+            Preconditions.checkNotNull(selectStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+
+            // setting the snapshot configuration
+            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+            if (snapshotName.isPresent())
+                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
+                        getQueryServices().getConfiguration(), snapshotName.get());
+
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+
+            List<KeyRange> allSplits = queryPlan.getSplits();
+            // Get the RegionSizeCalculator
+            PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+            org.apache.hadoop.hbase.client.Connection connection =
+                    phxConn.getQueryServices().getAdmin().getConnection();
+            RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(queryPlan
+                    .getTableRef().getTable().getPhysicalName().toString()));
+            RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
+                    .getAdmin());
+
+            final List<InputPartition<InternalRow>> partitions = Lists.newArrayListWithExpectedSize(allSplits.size());
+            for (List<Scan> scans : queryPlan.getScans()) {
+                // Get the region location
+                HRegionLocation location = regionLocator.getRegionLocation(
+                        scans.get(0).getStartRow(),
+                        false
+                );
+
+                String regionLocation = location.getHostname();
+
+                // Get the region size
+                long regionSize = sizeCalculator.getRegionSize(
+                        location.getRegionInfo().getRegionName()
+                );
+
+                PhoenixDataSourceReadOptions phoenixDataSourceOptions = new PhoenixDataSourceReadOptions(zkUrl,
+                        currentScnValue.orElse(null), tenantId.orElse(null), selectStatement);
+                if (splitByStats) {
+                    for (Scan aScan : scans) {
+                        partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                                new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)));
+                    }
+                } else {
+                    partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                            new PhoenixInputSplit(scans, regionSize, regionLocation)));
+                }
+            }
+            return partitions;
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to plan query", e);
+        }
+    }
+
+    @Override
+    public Filter[] pushedFilters() {
+        return pushedFilters;
+    }
+
+    @Override
+    public void pruneColumns(StructType schema) {
+        this.schema = schema;
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
new file mode 100644
index 0000000..624ff0f
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixInputPartition implements InputPartition<InternalRow> {
+
+    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private StructType schema;
+    private PhoenixDataSourceReadOptions options;
+
+    public PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
+        this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
+        this.schema = schema;
+        this.options = options;
+    }
+
+    @Override
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+        return new PhoenixInputPartitionReader(options, schema, phoenixInputSplit);
+    }
+
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
new file mode 100644
index 0000000..30e84db
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import scala.collection.Iterator;
+
+public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow>  {
+
+    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private StructType schema;
+    private Iterator<InternalRow> iterator;
+    private PhoenixResultSet resultSet;
+    private InternalRow currentRow;
+    private PhoenixDataSourceReadOptions options;
+
+    public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+        this.options = options;
+        this.phoenixInputSplit = phoenixInputSplit;
+        this.schema = schema;
+        initialize();
+    }
+
+    private QueryPlan getQueryPlan() throws SQLException {
+        String scn = options.getScn();
+        String tenantId = options.getTenantId();
+        String zkUrl = options.getZkUrl();
+        Properties overridingProps = new Properties();
+        if (scn != null) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+        }
+        if (tenantId != null) {
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+            final Statement statement = conn.createStatement();
+            final String selectStatement = options.getSelectStatement();
+            Preconditions.checkNotNull(selectStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            return queryPlan;
+        }
+    }
+
+    private void initialize() {
+        try {
+            final QueryPlan queryPlan = getQueryPlan();
+            final List<Scan> scans = phoenixInputSplit.value().getScans();
+            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            StatementContext ctx = queryPlan.getContext();
+            ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+            String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
+
+            // Clear the table region boundary cache to make sure long running jobs stay up to date
+            byte[] tableNameBytes = queryPlan.getTableRef().getTable().getPhysicalName().getBytes();
+            ConnectionQueryServices services = queryPlan.getContext().getConnection().getQueryServices();
+            services.clearTableRegionCache(tableNameBytes);
+
+            long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            for (Scan scan : scans) {
+                // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
+                scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+
+                PeekingResultIterator peekingResultIterator;
+                ScanMetricsHolder scanMetricsHolder =
+                        ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+                                queryPlan.getContext().getConnection().getLogLevel());
+                final TableResultIterator tableResultIterator =
+                        new TableResultIterator(
+                                queryPlan.getContext().getConnection().getMutationState(), scan,
+                                scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
+                                MapReduceParallelScanGrouper.getInstance());
+                peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
+            if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+            }
+            // Clone the row projector as it's not thread safe and would be used simultaneously by
+            // multiple threads otherwise.
+            this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+            this.iterator = SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new InputMetrics());
+        }
+        catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean next() {
+        if (!iterator.hasNext()) {
+            return false;
+        }
+        currentRow = iterator.next();
+        return true;
+    }
+
+    @Override
+    public InternalRow get() {
+        return currentRow;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if(resultSet != null) {
+            try {
+                resultSet.close();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
new file mode 100644
index 0000000..781d1c8
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.io.Serializable;
+
+public class PhoenixDataSourceWriteOptions implements Serializable {
+
+    private final String tableName;
+    private final String zkUrl;
+    private final String tenantId;
+    private final String scn;
+    private final StructType schema;
+    private final boolean skipNormalizingIdentifier;
+
+    private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, String tenantId,
+                                          StructType schema, boolean skipNormalizingIdentifier) {
+        this.tableName = tableName;
+        this.zkUrl = zkUrl;
+        this.scn = scn;
+        this.tenantId = tenantId;
+        this.schema = schema;
+        this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+    }
+
+    public String getScn() {
+        return scn;
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public StructType getSchema() {
+        return schema;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public boolean skipNormalizingIdentifier() {
+        return skipNormalizingIdentifier;
+    }
+
+    public static class Builder {
+        private String tableName;
+        private String zkUrl;
+        private String scn;
+        private String tenantId;
+        private StructType schema;
+        private boolean skipNormalizingIdentifier;
+
+        public Builder setTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Builder setZkUrl(String zkUrl) {
+            this.zkUrl = zkUrl;
+            return this;
+        }
+
+        public Builder setScn(String scn) {
+            this.scn = scn;
+            return this;
+        }
+
+        public Builder setTenantId(String tenantId) {
+            this.tenantId = tenantId;
+            return this;
+        }
+
+        public Builder setSchema(StructType schema) {
+            this.schema = schema;
+            return this;
+        }
+
+        public Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
+            this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+            return this;
+        }
+
+        public PhoenixDataSourceWriteOptions build() {
+            return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, skipNormalizingIdentifier);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
new file mode 100644
index 0000000..cf42aa5
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -0,0 +1,100 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
+import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixDataWriter implements DataWriter<InternalRow> {
+
+    private final StructType schema;
+    private final Connection conn;
+    private final PreparedStatement statement;
+
+    public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+        String scn = options.getScn();
+        String tenantId = options.getTenantId();
+        String zkUrl = options.getZkUrl();
+        Properties overridingProps = new Properties();
+        if (scn != null) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+        }
+        if (tenantId != null) {
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        this.schema = options.getSchema();
+        try {
+            this.conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps);
+            List<String> colNames = Lists.newArrayList(options.getSchema().names());
+            if (!options.skipNormalizingIdentifier()){
+                colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+            }
+            String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
+            this.statement = this.conn.prepareStatement(upsertSql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void write(InternalRow internalRow) throws IOException {
+        try {
+            int i=0;
+            for (StructField field : schema.fields()) {
+                DataType dataType = field.dataType();
+                if (internalRow.isNullAt(i)) {
+                    statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
+                            PhoenixJdbcDialect$.MODULE$).jdbcNullType());
+                } else {
+                    Row row = SparkJdbcUtil.toRow(schema, internalRow);
+                    SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
+                }
+                ++i;
+            }
+            statement.execute();
+        } catch (SQLException e) {
+            throw new IOException("Exception while executing Phoenix prepared statement", e);
+        }
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+        try {
+            conn.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                statement.close();
+                conn.close();
+            }
+            catch (SQLException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void abort() throws IOException {
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
new file mode 100644
index 0000000..751fdfa
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -0,0 +1,19 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+
+public class PhoenixDataWriterFactory implements DataWriterFactory<InternalRow> {
+
+    private final PhoenixDataSourceWriteOptions options;
+
+    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+        return new PhoenixDataWriter(options);
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
new file mode 100644
index 0000000..7847609
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -0,0 +1,34 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+public class PhoenixDatasourceWriter implements DataSourceWriter {
+
+    private final PhoenixDataSourceWriteOptions options;
+
+    public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public DataWriterFactory<InternalRow> createWriterFactory() {
+        return new PhoenixDataWriterFactory(options);
+    }
+
+    @Override
+    public boolean useCommitCoordinator() {
+        return false;
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+    }
+}
diff --git a/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..6eff1af
--- /dev/null
+++ b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
\ No newline at end of file
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index ca476e7..d555954 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -21,6 +21,7 @@
 
 import scala.collection.JavaConversions._
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 object ConfigurationUtil extends Serializable {
 
   def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = {
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index be4a32b..3b0289d 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -22,7 +22,7 @@
 
 import scala.collection.JavaConversions._
 
-
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class DataFrameFunctions(data: DataFrame) extends Serializable {
   def saveToPhoenix(parameters: Map[String, String]): Unit = {
   		saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index e000b74..ccdf595 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -20,6 +20,7 @@
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class DefaultSource extends RelationProvider with CreatableRelationProvider {
 
   // Override 'RelationProvider.createRelation', this enables DataFrame.load()
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
new file mode 100644
index 0000000..74ff67e
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.Timestamp
+import java.text.Format
+
+import org.apache.phoenix.util.{DateUtil, SchemaUtil}
+import org.apache.phoenix.util.StringUtil.escapeStringConstant
+import org.apache.spark.sql.sources._
+
+class FilterExpressionCompiler() {
+
+  val timeformatter:Format = DateUtil.getTimestampFormatter(DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID)
+
+  /**
+    * Attempt to create Phoenix-accepted WHERE clause from Spark filters,
+    * mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
+    *
+    * @return tuple representing where clause (derived from supported filters),
+    *         array of unsupported filters and array of supported filters
+    */
+  def pushFilters(filters: Array[Filter]): (String, Array[Filter], Array[Filter]) = {
+    if (filters.isEmpty) {
+      return ("" , Array[Filter](), Array[Filter]())
+    }
+
+    val filter = new StringBuilder("")
+    val unsupportedFilters = Array[Filter]();
+    var i = 0
+
+    filters.foreach(f => {
+      // Assume conjunction for multiple filters, unless otherwise specified
+      if (i > 0) {
+        filter.append(" AND")
+      }
+
+      f match {
+        // Spark 1.3.1+ supported filters
+        case And(leftFilter, rightFilter)  => {
+          val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(leftFilter, rightFilter))
+          if (currUnsupportedFilters.isEmpty)
+            filter.append(whereClause)
+          else
+            unsupportedFilters :+ f
+        }
+        case Or(leftFilter, rightFilter) => {
+          val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
+          val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
+          if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
+            filter.append(whereLeftClause + " OR " + whereRightClause)
+          }
+          else {
+            unsupportedFilters :+ f
+          }
+        }
+        case Not(aFilter) => {
+          val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(aFilter))
+          if (currUnsupportedFilters.isEmpty)
+            filter.append(" NOT " + whereClause)
+          else
+            unsupportedFilters :+ f
+        }
+        case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
+        case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
+        case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
+        case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
+        case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
+        case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
+        case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
+        case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
+        case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
+        case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
+        case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
+        case _ => unsupportedFilters :+ f
+      }
+
+      i = i + 1
+    })
+
+    (filter.toString(), unsupportedFilters, filters diff unsupportedFilters)
+  }
+
+  // Helper function to escape string values in SQL queries
+  private def compileValue(value: Any): Any = value match {
+    case stringValue: String => s"'${escapeStringConstant(stringValue)}'"
+
+    case timestampValue: Timestamp => getTimestampString(timestampValue)
+
+    // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions
+    // Spark 1.4
+    case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
+    // Spark 1.5
+    case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
+
+    // Pass through anything else
+    case _ => value
+  }
+
+  private def getTimestampString(timestampValue: Timestamp): String = {
+    "TO_TIMESTAMP('%s', '%s', '%s')".format(timeformatter.format(timestampValue),
+      DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID)
+  }
+
+  // Helper function to escape column key to work with SQL queries
+  private def escapeKey(key: String): String = SchemaUtil.getEscapedFullColumnName(key)
+
+  private def isClass(obj: Any, className: String) = {
+    className.equals(obj.getClass().getName())
+  }
+}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index d604e0e..7331a5f 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -21,9 +21,6 @@
 import org.apache.phoenix.jdbc.PhoenixDriver
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
-import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
@@ -32,6 +29,7 @@
 
 import scala.collection.JavaConverters._
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
                  predicate: Option[String] = None,
                  zkUrl: Option[String] = None,
@@ -126,7 +124,7 @@
 
 
     // Lookup the Spark catalyst types from the Phoenix schema
-    val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray
+    val structType = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoList, dateAsTimestamp)
 
     // Create the data frame from the converted Spark schema
     sqlContext.createDataFrame(map(pr => {
@@ -146,60 +144,7 @@
 
       // Create a Spark Row from the sequence
       Row.fromSeq(rowSeq)
-    }), new StructType(structFields))
+    }), structType)
   }
 
-  def normalizeColumnName(columnName: String) = {
-    val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
-    var normalizedColumnName = ""
-    if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
-      normalizedColumnName = unescapedColumnName
-    }
-    else {
-      // split by separator to get the column family and column name
-      val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
-      normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
-    }
-    normalizedColumnName
-  }
-
-  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = columnList.map(ci => {
-    val structType = phoenixTypeToCatalystType(ci)
-    StructField(normalizeColumnName(ci.getColumnName), structType)
-  })
-
-
-  // Lookup table for Phoenix types to Spark catalyst types
-  def phoenixTypeToCatalystType(columnInfo: ColumnInfo): DataType = columnInfo.getPDataType match {
-    case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
-    case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
-    case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
-    case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
-    case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
-    case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
-    case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
-    // Use Spark system default precision for now (explicit to work with < 1.5)
-    case t if t.isInstanceOf[PDecimal] =>
-      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
-    case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
-    case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
-    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
-    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
-    case t if t.isInstanceOf[PBoolean] => BooleanType
-    case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
-    case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
-    case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
-    case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
-    case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
-    case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
-    case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
-    case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
-    case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
-    case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
-    case t if t.isInstanceOf[PDecimalArray] => ArrayType(
-      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
-    case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
-    case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
-    case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
-  }
 }
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index c35cc54..6d4c4cc 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -20,7 +20,7 @@
 import org.joda.time.DateTime
 import scala.collection.{mutable, immutable}
 
-
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class PhoenixRecordWritable(columnMetaDataList: List[ColumnInfo]) extends DBWritable {
   val upsertValues = mutable.ArrayBuffer[Any]()
   val resultMap = mutable.Map[String, AnyRef]()
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 38bf29a..2f6ea8c 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,12 +19,11 @@
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.phoenix.util.SchemaUtil
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
     extends BaseRelation with PrunedFilteredScan {
 
@@ -36,7 +35,7 @@
     but this prevents having to load the whole table into Spark first.
   */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    val(pushedFilters, unhandledFilters) = buildFilter(filters)
+    val(pushedFilters, _, _) = new FilterExpressionCompiler().pushFilters(filters)
     new PhoenixRDD(
       sqlContext.sparkContext,
       tableName,
@@ -61,71 +60,10 @@
     ).toDataFrame(sqlContext).schema
   }
 
-  // Attempt to create Phoenix-accepted WHERE clauses from Spark filters,
-  // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
-  private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = {
-    if (filters.isEmpty) {
-      return ("" , Array[Filter]())
-    }
-
-    val filter = new StringBuilder("")
-    val unsupportedFilters = Array[Filter]();
-    var i = 0
-
-    filters.foreach(f => {
-      // Assume conjunction for multiple filters, unless otherwise specified
-      if (i > 0) {
-        filter.append(" AND")
-      }
-
-      f match {
-        // Spark 1.3.1+ supported filters
-        case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
-        case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
-        case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
-        case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
-        case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
-        case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
-        case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
-        case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
-        case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
-        case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
-        case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
-        case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
-        case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
-        case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
-        case _ => unsupportedFilters :+ f
-      }
-
-      i = i + 1
-    })
-
-    (filter.toString(), unsupportedFilters)
-  }
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
-    val(pushedFilters, unhandledFilters) = buildFilter(filters)
+    val (_, unhandledFilters, _) = new FilterExpressionCompiler().pushFilters(filters)
     unhandledFilters
   }
 
-  // Helper function to escape column key to work with SQL queries
-  private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key)
-
-  // Helper function to escape string values in SQL queries
-  private def compileValue(value: Any): Any = value match {
-    case stringValue: String => s"'${escapeStringConstant(stringValue)}'"
-
-    // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions
-    // Spark 1.4
-    case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
-    // Spark 1.5
-    case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
-
-    // Pass through anything else
-    case _ => value
-  }
-
-  private def isClass(obj: Any, className: String) = {
-    className.equals(obj.getClass().getName())
-  }
 }
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 1b33e6e..b073521 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -21,6 +21,7 @@
 
 import scala.collection.JavaConversions._
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Serializable {
 
   def saveToPhoenix(tableName: String, cols: Seq[String],
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
index 476ce8a..1b377ab 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
@@ -17,6 +17,7 @@
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable {
 
   /*
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
new file mode 100644
index 0000000..f69e988
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.query.QueryConstants
+import org.apache.phoenix.schema.types._
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
+import org.apache.spark.sql.types._
+
+object SparkSchemaUtil {
+
+  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = {
+    val structFields = columnList.map(ci => {
+      val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
+      StructField(normalizeColumnName(ci.getColumnName), structType)
+    })
+    new StructType(structFields.toArray)
+  }
+
+  def normalizeColumnName(columnName: String) = {
+    val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
+    var normalizedColumnName = ""
+    if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
+      normalizedColumnName = unescapedColumnName
+    }
+    else {
+      // split by separator to get the column family and column name
+      val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
+      normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
+    }
+    normalizedColumnName
+  }
+
+
+  // Lookup table for Phoenix types to Spark catalyst types
+  def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
+    case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
+    case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
+    case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
+    case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
+    case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
+    case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
+    case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
+    // Use Spark system default precision for now (explicit to work with < 1.5)
+    case t if t.isInstanceOf[PDecimal] =>
+      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
+    case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
+    case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
+    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
+    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
+    case t if t.isInstanceOf[PBoolean] => BooleanType
+    case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
+    case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
+    case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
+    case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
+    case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
+    case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
+    case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
+    case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
+    case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
+    case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
+    case t if t.isInstanceOf[PDecimalArray] => ArrayType(
+      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
+    case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
+    case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
+    case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
+  }
+
+}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
index a0842c9..f9154ad 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
@@ -16,6 +16,7 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{DataFrame, SQLContext}
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class SparkSqlContextFunctions(@transient val sqlContext: SQLContext) extends Serializable {
 
   /*
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
new file mode 100644
index 0000000..712ec2d
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
+import org.apache.spark.sql.types._
+
+private object PhoenixJdbcDialect  extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:phoenix")
+
+  /**
+    * This is only called for ArrayType (see JdbcUtils.makeSetter)
+    */
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Some(JdbcType("VARCHAR", java.sql.Types.VARCHAR))
+    case BinaryType => Some(JdbcType("BINARY(" + dt.defaultSize + ")", java.sql.Types.BINARY))
+    case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT))
+    case _ => None
+  }
+
+
+}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
new file mode 100644
index 0000000..eac483a
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.sql.{Connection, PreparedStatement, ResultSet}
+import java.util.Locale
+
+import org.apache.spark.executor.InputMetrics
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
+import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.NextIterator
+
+object SparkJdbcUtil {
+
+  def toRow(schema: StructType, internalRow: InternalRow) : Row = {
+    val encoder = RowEncoder(schema).resolveAndBind()
+    encoder.fromRow(internalRow)
+  }
+
+  // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
+  // for `MutableRow`. The last argument `Int` means the index for the value to be set in
+  // the row and also used for the value in `ResultSet`.
+  private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
+
+  private def nullSafeConvert[T](input: T, f: T => Any): Any = {
+    if (input == null) {
+      null
+    } else {
+      f(input)
+    }
+  }
+
+  /**
+    * Creates `JDBCValueGetter`s according to [[StructType]], which can set
+    * each value from `ResultSet` to each field of [[InternalRow]] correctly.
+    */
+  private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
+    schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
+
+  private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
+    case BooleanType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setBoolean(pos, rs.getBoolean(pos + 1))
+
+    case DateType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
+        val dateVal = rs.getDate(pos + 1)
+        if (dateVal != null) {
+          row.setInt(pos, DateTimeUtils.fromJavaDate(dateVal))
+        } else {
+          row.update(pos, null)
+        }
+
+    // When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal
+    // object returned by ResultSet.getBigDecimal is not correctly matched to the table
+    // schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale.
+    // If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through
+    // a BigDecimal object with scale as 0. But the dataframe schema has correct type as
+    // DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then
+    // retrieve it, you will get wrong result 199.99.
+    // So it is needed to set precision and scale for Decimal based on JDBC metadata.
+    case DecimalType.Fixed(p, s) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val decimal =
+          nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
+        row.update(pos, decimal)
+
+    case DoubleType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setDouble(pos, rs.getDouble(pos + 1))
+
+    case FloatType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setFloat(pos, rs.getFloat(pos + 1))
+
+    case IntegerType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setInt(pos, rs.getInt(pos + 1))
+
+    case LongType if metadata.contains("binarylong") =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val bytes = rs.getBytes(pos + 1)
+        var ans = 0L
+        var j = 0
+        while (j < bytes.length) {
+          ans = 256 * ans + (255 & bytes(j))
+          j = j + 1
+        }
+        row.setLong(pos, ans)
+
+    case LongType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setLong(pos, rs.getLong(pos + 1))
+
+    case ShortType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setShort(pos, rs.getShort(pos + 1))
+
+    case StringType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
+        row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
+
+    case TimestampType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val t = rs.getTimestamp(pos + 1)
+        if (t != null) {
+          row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
+        } else {
+          row.update(pos, null)
+        }
+
+    case BinaryType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos, rs.getBytes(pos + 1))
+
+    case ByteType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos, rs.getByte(pos + 1))
+
+    case ArrayType(et, _) =>
+      val elementConversion = et match {
+        case TimestampType =>
+          (array: Object) =>
+            array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
+              nullSafeConvert(timestamp, DateTimeUtils.fromJavaTimestamp)
+            }
+
+        case StringType =>
+          (array: Object) =>
+            // some underling types are not String such as uuid, inet, cidr, etc.
+            array.asInstanceOf[Array[java.lang.Object]]
+              .map(obj => if (obj == null) null else UTF8String.fromString(obj.toString))
+
+        case DateType =>
+          (array: Object) =>
+            array.asInstanceOf[Array[java.sql.Date]].map { date =>
+              nullSafeConvert(date, DateTimeUtils.fromJavaDate)
+            }
+
+        case dt: DecimalType =>
+          (array: Object) =>
+            array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
+              nullSafeConvert[java.math.BigDecimal](
+                decimal, d => Decimal(d, dt.precision, dt.scale))
+            }
+
+        case LongType if metadata.contains("binarylong") =>
+          throw new IllegalArgumentException(s"Unsupported array element " +
+            s"type ${dt.catalogString} based on binary")
+
+        case ArrayType(_, _) =>
+          throw new IllegalArgumentException("Nested arrays unsupported")
+
+        case _ => (array: Object) => array.asInstanceOf[Array[Any]]
+      }
+
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val array = nullSafeConvert[java.sql.Array](
+          input = rs.getArray(pos + 1),
+          array => new GenericArrayData(elementConversion.apply(array.getArray)))
+        row.update(pos, array)
+
+    case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.catalogString}")
+  }
+
+  // TODO just use JdbcUtils.resultSetToSparkInternalRows in Spark 3.0 (see SPARK-26499)
+  def resultSetToSparkInternalRows(
+                                    resultSet: ResultSet,
+                                    schema: StructType,
+                                    inputMetrics: InputMetrics): Iterator[InternalRow] = {
+    // JdbcUtils.resultSetToSparkInternalRows(resultSet, schema, inputMetrics)
+    new NextIterator[InternalRow] {
+      private[this] val rs = resultSet
+      private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
+      private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
+
+      override protected def close(): Unit = {
+        try {
+          rs.close()
+        } catch {
+          case e: Exception =>
+        }
+      }
+
+      override protected def getNext(): InternalRow = {
+        if (rs.next()) {
+          inputMetrics.incRecordsRead(1)
+          var i = 0
+          while (i < getters.length) {
+            getters(i).apply(rs, mutableRow, i)
+            if (rs.wasNull) mutableRow.setNullAt(i)
+            i = i + 1
+          }
+          mutableRow
+        } else {
+          finished = true
+          null.asInstanceOf[InternalRow]
+        }
+      }
+    }
+  }
+
+  // A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for
+  // `PreparedStatement`. The last argument `Int` means the index for the value to be set
+  // in the SQL statement and also used for the value in `Row`.
+  private type JDBCValueSetter = (PreparedStatement, Row, Int) => Unit
+
+  // take from Spark JdbcUtils.scala, cannot be used directly because the method is private
+  def makeSetter(
+                  conn: Connection,
+                  dialect: JdbcDialect,
+                  dataType: DataType): JDBCValueSetter = dataType match {
+    case IntegerType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setInt(pos + 1, row.getInt(pos))
+
+    case LongType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setLong(pos + 1, row.getLong(pos))
+
+    case DoubleType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setDouble(pos + 1, row.getDouble(pos))
+
+    case FloatType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setFloat(pos + 1, row.getFloat(pos))
+
+    case ShortType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setInt(pos + 1, row.getShort(pos))
+
+    case ByteType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setInt(pos + 1, row.getByte(pos))
+
+    case BooleanType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setBoolean(pos + 1, row.getBoolean(pos))
+
+    case StringType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setString(pos + 1, row.getString(pos))
+
+    case BinaryType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
+
+    case TimestampType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+
+    case DateType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+
+    case t: DecimalType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setBigDecimal(pos + 1, row.getDecimal(pos))
+
+    case ArrayType(et, _) =>
+      // remove type length parameters from end of type name
+      val typeName = getJdbcType(et, dialect).databaseTypeDefinition
+        .toLowerCase(Locale.ROOT).split("\\(")(0)
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        val array = conn.createArrayOf(
+          typeName,
+          row.getSeq[AnyRef](pos).toArray)
+        stmt.setArray(pos + 1, array)
+
+    case _ =>
+      (_: PreparedStatement, _: Row, pos: Int) =>
+        throw new IllegalArgumentException(
+          s"Can't translate non-null value for field $pos")
+  }
+
+  // taken from Spark JdbcUtils
+  def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
+    dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
+      throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
+  }
+
+}