PHOENIX-5232: PhoenixDataWriter in Phoenix-Spark connector does not commit when mutation batch size is reached
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 0bbd983..56d2111 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -478,8 +478,8 @@
</dependencies>
<build>
- <testSourceDirectory>src/it/scala</testSourceDirectory>
- <testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <testResources><testResource><directory>src/test/resources</directory></testResource></testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index b40b638..58910ce 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -16,12 +16,17 @@
import java.sql.DriverManager
import java.util.Date
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, PhoenixTestingDataSource}
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.SparkException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
@@ -249,6 +254,30 @@
count shouldEqual 1L
}
+ test("Can use extraOptions to set configs for workers during reads") {
+ // Pass in true, so we will get null when fetching the current row, leading to an NPE
+ var extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW + "=true"
+ var rdd = spark.sqlContext.read
+ .format(PhoenixTestingDataSource.TEST_SOURCE)
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+
+ // Expect to get a NullPointerException in the executors
+ var error = intercept[SparkException] {
+ rdd.take(2)(0)(1)
+ }
+ assert(error.getCause.isInstanceOf[NullPointerException])
+
+ // Pass in false, so we will get the expected rows
+ extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW + "=false"
+ rdd = spark.sqlContext.read
+ .format(PhoenixTestingDataSource.TEST_SOURCE)
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+ val stringValue = rdd.take(2)(0)(1)
+ stringValue shouldEqual "test_row_1"
+ }
+
test("Can save to phoenix table") {
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
@@ -282,6 +311,47 @@
}
}
+ test("Can use extraOptions to set configs for workers during writes") {
+ val totalRecords = 100
+ val upsertBatchSize = 5
+
+ val records = new mutable.MutableList[Row]
+ for (x <- 1 to totalRecords) {
+ records += Row(x.toLong, x.toString, x)
+ }
+ val dataSet = records.toList
+
+ val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("COL1", StringType),
+ StructField("COL2", IntegerType)))
+
+ // Distribute the dataset into an RDD with just 1 partition so we use only 1 executor.
+ // This makes it easy to deterministically count the batched commits from that executor
+ // since it corresponds to exactly 1 input partition. In case of multiple executors with
+ // an uneven distribution of input partitions, if
+ // (number of records in that partition) % batchSize != 0, some updates would also be committed
+ // via PhoenixDataWriter#commit rather than the batch commits via PhoenixDataWriter#write
+ // and those would thus, not be counted by PhoenixTestingDataWriter.
+ val rowRDD = spark.sparkContext.parallelize(dataSet, 1)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+ val extraOptions = PhoenixConfigurationUtil.UPSERT_BATCH_SIZE + "=" + upsertBatchSize.toString
+
+ // Initially, this should be zero
+ PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual 0
+ df.write
+ .format(PhoenixTestingDataSource.TEST_SOURCE)
+ .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ // Verify the number of times batched updates are committed via DataWriters
+ PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual totalRecords/upsertBatchSize
+ }
+
test("Can save dates to Phoenix using java.sql.Date") {
val date = java.sql.Date.valueOf("2016-09-30")
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index 02b5edf..21250af 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -21,11 +21,8 @@
import java.util.Properties;
import org.apache.log4j.Logger;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -52,20 +49,9 @@
}
@Override
- public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode,
- DataSourceOptions options) {
- if (!mode.equals(SaveMode.Overwrite)) {
- throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
- }
- if (!options.tableName().isPresent()) {
- throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
- }
- if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
- throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
- }
-
- PhoenixDataSourceWriteOptions writeOptions = createPhoenixDataSourceWriteOptions(options, schema);
- return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema,
+ SaveMode mode, DataSourceOptions options) {
+ return Optional.of(new PhoenixDataSourceWriter(mode, schema, options));
}
/**
@@ -102,19 +88,6 @@
return confToSet;
}
- private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
- StructType schema) {
- String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
- String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
- String zkUrl = options.get(ZOOKEEPER_URL).get();
- boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
- return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
- .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
- .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
- .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
- .build();
- }
-
@Override
public String shortName() {
return "phoenix";
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 70062c8..67343d4 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -22,7 +22,7 @@
import java.io.Serializable;
import java.util.Properties;
-public class PhoenixDataSourceReadOptions implements Serializable {
+class PhoenixDataSourceReadOptions implements Serializable {
private final String tenantId;
private final String zkUrl;
@@ -30,7 +30,7 @@
private final String selectStatement;
private final Properties overriddenProps;
- public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
+ PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
String selectStatement, Properties overriddenProps) {
Preconditions.checkNotNull(overriddenProps);
this.zkUrl = zkUrl;
@@ -40,23 +40,23 @@
this.overriddenProps = overriddenProps;
}
- public String getSelectStatement() {
+ String getSelectStatement() {
return selectStatement;
}
- public String getScn() {
+ String getScn() {
return scn;
}
- public String getZkUrl() {
+ String getZkUrl() {
return zkUrl;
}
- public String getTenantId() {
+ String getTenantId() {
return tenantId;
}
- public Properties getOverriddenProps() {
+ Properties getOverriddenProps() {
return overriddenProps;
}
}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 8476509..18e304b 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -85,7 +85,7 @@
}
this.options = options;
this.tableName = options.tableName().get();
- this.zkUrl = options.get("zkUrl").get();
+ this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
setSchema();
@@ -106,6 +106,11 @@
}
}
+ PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
+ PhoenixInputSplit inputSplit) {
+ return new PhoenixInputPartition(readOptions, schema, inputSplit);
+ }
+
@Override
public StructType readSchema() {
return schema;
@@ -175,19 +180,18 @@
// Get the region size
long regionSize = sizeCalculator.getRegionSize(
- location.getRegionInfo().getRegionName()
- );
+ location.getRegionInfo().getRegionName());
PhoenixDataSourceReadOptions phoenixDataSourceOptions =
new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
tenantId.orElse(null), selectStatement, overriddenProps);
if (splitByStats) {
for (Scan aScan : scans) {
- partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+ partitions.add(getInputPartition(phoenixDataSourceOptions,
new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)));
}
} else {
- partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+ partitions.add(getInputPartition(phoenixDataSourceOptions,
new PhoenixInputSplit(scans, regionSize, regionLocation)));
}
}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
index 624ff0f..e4adc07 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -26,16 +26,28 @@
public class PhoenixInputPartition implements InputPartition<InternalRow> {
- private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
- private StructType schema;
- private PhoenixDataSourceReadOptions options;
+ private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+ private final StructType schema;
+ private final PhoenixDataSourceReadOptions options;
- public PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
+ PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
this.schema = schema;
this.options = options;
}
+ PhoenixDataSourceReadOptions getOptions() {
+ return options;
+ }
+
+ StructType getSchema() {
+ return schema;
+ }
+
+ SerializableWritable<PhoenixInputSplit> getPhoenixInputSplit() {
+ return phoenixInputSplit;
+ }
+
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
return new PhoenixInputPartitionReader(options, schema, phoenixInputSplit);
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index 6f6413b..49f6c13 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -62,25 +62,30 @@
public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow> {
- private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
- private StructType schema;
+ private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+ private final StructType schema;
+ private final PhoenixDataSourceReadOptions options;
private Iterator<InternalRow> iterator;
private PhoenixResultSet resultSet;
private InternalRow currentRow;
- private PhoenixDataSourceReadOptions options;
- public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+ PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema,
+ SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
this.options = options;
this.phoenixInputSplit = phoenixInputSplit;
this.schema = schema;
initialize();
}
+ Properties getOverriddenPropsFromOptions() {
+ return options.getOverriddenProps();
+ }
+
private QueryPlan getQueryPlan() throws SQLException {
String scn = options.getScn();
String tenantId = options.getTenantId();
String zkUrl = options.getZkUrl();
- Properties overridingProps = options.getOverriddenProps();
+ Properties overridingProps = getOverriddenPropsFromOptions();
if (scn != null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
}
@@ -95,8 +100,7 @@
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
// Optimize the query plan so that we potentially use secondary indexes
- final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
- return queryPlan;
+ return pstmt.optimizeQuery(selectStatement);
}
}
@@ -114,7 +118,8 @@
ConnectionQueryServices services = queryPlan.getContext().getConnection().getQueryServices();
services.clearTableRegionCache(tableNameBytes);
- long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
+ .getQueryServices().getRenewLeaseThresholdMilliSeconds();
for (Scan scan : scans) {
// For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
@@ -131,13 +136,16 @@
peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
- ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
+ ResultIterator iterator = queryPlan.useRoundRobinIterator() ?
+ RoundRobinResultIterator.newIterator(iterators, queryPlan) :
+ ConcatResultIterator.newIterator(iterators);
if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
}
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+ this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(),
+ queryPlan.getContext());
this.iterator = SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new InputMetrics());
}
catch (SQLException e) {
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index 434f13c..c130db8 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -23,7 +23,7 @@
import java.io.Serializable;
import java.util.Properties;
-public class PhoenixDataSourceWriteOptions implements Serializable {
+class PhoenixDataSourceWriteOptions implements Serializable {
private final String tableName;
private final String zkUrl;
@@ -36,6 +36,9 @@
private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn,
String tenantId, StructType schema, boolean skipNormalizingIdentifier,
Properties overriddenProps) {
+ Preconditions.checkNotNull(tableName);
+ Preconditions.checkNotNull(zkUrl);
+ Preconditions.checkNotNull(schema);
Preconditions.checkNotNull(overriddenProps);
this.tableName = tableName;
this.zkUrl = zkUrl;
@@ -46,35 +49,35 @@
this.overriddenProps = overriddenProps;
}
- public String getScn() {
+ String getScn() {
return scn;
}
- public String getZkUrl() {
+ String getZkUrl() {
return zkUrl;
}
- public String getTenantId() {
+ String getTenantId() {
return tenantId;
}
- public StructType getSchema() {
+ StructType getSchema() {
return schema;
}
- public String getTableName() {
+ String getTableName() {
return tableName;
}
- public boolean skipNormalizingIdentifier() {
+ boolean skipNormalizingIdentifier() {
return skipNormalizingIdentifier;
}
- public Properties getOverriddenProps() {
+ Properties getOverriddenProps() {
return overriddenProps;
}
- public static class Builder {
+ static class Builder {
private String tableName;
private String zkUrl;
private String scn;
@@ -83,42 +86,42 @@
private boolean skipNormalizingIdentifier;
private Properties overriddenProps = new Properties();
- public Builder setTableName(String tableName) {
+ Builder setTableName(String tableName) {
this.tableName = tableName;
return this;
}
- public Builder setZkUrl(String zkUrl) {
+ Builder setZkUrl(String zkUrl) {
this.zkUrl = zkUrl;
return this;
}
- public Builder setScn(String scn) {
+ Builder setScn(String scn) {
this.scn = scn;
return this;
}
- public Builder setTenantId(String tenantId) {
+ Builder setTenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}
- public Builder setSchema(StructType schema) {
+ Builder setSchema(StructType schema) {
this.schema = schema;
return this;
}
- public Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
+ Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
this.skipNormalizingIdentifier = skipNormalizingIdentifier;
return this;
}
- public Builder setOverriddenProps(Properties overriddenProps) {
+ Builder setOverriddenProps(Properties overriddenProps) {
this.overriddenProps = overriddenProps;
return this;
}
- public PhoenixDataSourceWriteOptions build() {
+ PhoenixDataSourceWriteOptions build() {
return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema,
skipNormalizingIdentifier, overriddenProps);
}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 6793673..04670d5 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -26,6 +26,7 @@
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.log4j.Logger;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -40,16 +41,22 @@
import org.apache.spark.sql.types.StructType;
import com.google.common.collect.Lists;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
public class PhoenixDataWriter implements DataWriter<InternalRow> {
+ private static final Logger logger = Logger.getLogger(PhoenixDataWriter.class);
private final StructType schema;
private final Connection conn;
private final PreparedStatement statement;
+ private final long batchSize;
+ private long numRecords = 0;
- public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+ PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
String scn = options.getScn();
String tenantId = options.getTenantId();
String zkUrl = options.getZkUrl();
@@ -66,15 +73,21 @@
overridingProps);
List<String> colNames = Lists.newArrayList(options.getSchema().names());
if (!options.skipNormalizingIdentifier()){
- colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+ colNames = colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList());
}
String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
this.statement = this.conn.prepareStatement(upsertSql);
+ this.batchSize = Long.valueOf(overridingProps.getProperty(UPSERT_BATCH_SIZE,
+ String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ void commitBatchUpdates() throws SQLException {
+ conn.commit();
+ }
+
@Override
public void write(InternalRow internalRow) throws IOException {
try {
@@ -90,14 +103,21 @@
}
++i;
}
+ numRecords++;
statement.execute();
+ if (numRecords % batchSize == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("commit called on a batch of size : " + batchSize);
+ }
+ commitBatchUpdates();
+ }
} catch (SQLException e) {
throw new IOException("Exception while executing Phoenix prepared statement", e);
}
}
@Override
- public WriterCommitMessage commit() throws IOException {
+ public WriterCommitMessage commit() {
try {
conn.commit();
} catch (SQLException e) {
@@ -115,6 +135,6 @@
}
@Override
- public void abort() throws IOException {
+ public void abort() {
}
}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
index f7654e3..6fce8fe 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -25,10 +25,14 @@
private final PhoenixDataSourceWriteOptions options;
- public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+ PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
this.options = options;
}
+ PhoenixDataSourceWriteOptions getOptions() {
+ return options;
+ }
+
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
return new PhoenixDataWriter(options);
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
index 9d713b8..31a3065 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -17,17 +17,35 @@
*/
package org.apache.phoenix.spark.datasource.v2.writer;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+
+public class PhoenixDataSourceWriter implements DataSourceWriter {
private final PhoenixDataSourceWriteOptions options;
- public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
- this.options = options;
+ public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
+ if (!mode.equals(SaveMode.Overwrite)) {
+ throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
+ }
+ if (!options.tableName().isPresent()) {
+ throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+ }
+ if (!options.get(ZOOKEEPER_URL).isPresent()) {
+ throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined");
+ }
+ this.options = createPhoenixDataSourceWriteOptions(options, schema);
}
@Override
@@ -47,4 +65,25 @@
@Override
public void abort(WriterCommitMessage[] messages) {
}
+
+ PhoenixDataSourceWriteOptions getOptions() {
+ return options;
+ }
+
+ private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+ StructType schema) {
+ String scn = options.get(CURRENT_SCN_VALUE).orElse(null);
+ String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+ String zkUrl = options.get(ZOOKEEPER_URL).get();
+ boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+ return new PhoenixDataSourceWriteOptions.Builder()
+ .setTableName(options.tableName().get())
+ .setZkUrl(zkUrl)
+ .setScn(scn)
+ .setTenantId(tenantId)
+ .setSchema(schema)
+ .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
+ .setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+ .build();
+ }
}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 1d6973c..a2ec2dc 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -23,7 +23,8 @@
import org.apache.phoenix.util.{DateUtil, SchemaUtil}
import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{And, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In,
+IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
class FilterExpressionCompiler() {
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 34033b7..89d808d 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -16,16 +16,15 @@
import java.sql.DriverManager
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.jdbc.PhoenixDriver
import org.apache.phoenix.mapreduce.PhoenixInputFormat
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.query.HBaseFactoryProvider
-import org.apache.spark._
+import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConverters._
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 6d4c4cc..66c347e 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -15,7 +15,7 @@
import java.sql.{PreparedStatement, ResultSet}
import org.apache.hadoop.mapreduce.lib.db.DBWritable
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PDataType, PDate, PVarbinary, PhoenixArray}
import org.apache.phoenix.util.ColumnInfo
import org.joda.time.DateTime
import scala.collection.{mutable, immutable}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 2f6ea8c..aacd460 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,7 +19,7 @@
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index f69e988..363acf8 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -18,9 +18,18 @@
package org.apache.phoenix.spark
import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PBinaryArray, PBoolean, PBooleanArray, PChar,
+PCharArray, PDate, PDateArray, PDecimal, PDecimalArray, PDouble, PDoubleArray, PFloat, PFloatArray,
+PInteger, PIntegerArray, PLong, PLongArray, PSmallint, PSmallintArray, PTime, PTimeArray,
+PTimestamp, PTimestampArray, PTinyint, PTinyintArray, PUnsignedDate, PUnsignedDateArray,
+PUnsignedDouble, PUnsignedDoubleArray, PUnsignedFloat, PUnsignedFloatArray, PUnsignedInt,
+PUnsignedIntArray, PUnsignedLong, PUnsignedLongArray, PUnsignedSmallint, PUnsignedSmallintArray,
+PUnsignedTime, PUnsignedTimeArray, PUnsignedTimestamp, PUnsignedTimestampArray, PUnsignedTinyint,
+PUnsignedTinyintArray, PVarbinary, PVarbinaryArray, PVarchar, PVarcharArray}
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType,
+DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField,
+StructType, TimestampType}
object SparkSchemaUtil {
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
index 01437f0..f89a451 100644
--- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources.jdbc
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType}
private object PhoenixJdbcDialect extends JdbcDialect {
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
index eac483a..50cdbf5 100644
--- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -27,10 +27,11 @@
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType,
+Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, ShortType, StringType,
+StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
new file mode 100644
index 0000000..6a2299e
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingDataSourceReader;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Optional;
+
+public class PhoenixTestingDataSource extends PhoenixDataSource {
+
+ public static final String TEST_SOURCE =
+ "org.apache.phoenix.spark.datasource.v2.PhoenixTestingDataSource";
+
+ // Override to return a test DataSourceReader
+ @Override
+ public DataSourceReader createReader(DataSourceOptions options) {
+ return new PhoenixTestingDataSourceReader(options);
+ }
+
+ // Override to return a test DataSourceWriter
+ @Override
+ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema,
+ SaveMode mode, DataSourceOptions options) {
+ return Optional.of(new PhoenixTestingDataSourceWriter(mode, schema, options));
+ }
+
+ @Override
+ public String shortName() {
+ return "phoenixTesting";
+ }
+
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
new file mode 100644
index 0000000..0116255
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+
+public class PhoenixTestingDataSourceReader extends PhoenixDataSourceReader {
+
+ public PhoenixTestingDataSourceReader(DataSourceOptions options) {
+ super(options);
+ }
+
+ // Override to return a test InputPartition
+ @Override
+ PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
+ PhoenixInputSplit inputSplit) {
+ return new PhoenixTestingInputPartition(readOptions, readSchema(), inputSplit);
+ }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
new file mode 100644
index 0000000..eca7fc7
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixTestingInputPartition extends PhoenixInputPartition {
+
+ PhoenixTestingInputPartition(PhoenixDataSourceReadOptions options, StructType schema,
+ PhoenixInputSplit phoenixInputSplit) {
+ super(options, schema, phoenixInputSplit);
+ }
+
+ // Override to return a test InputPartitionReader for testing on the executor-side
+ @Override
+ public InputPartitionReader<InternalRow> createPartitionReader() {
+ return new PhoenixTestingInputPartitionReader(getOptions(), getSchema(),
+ getPhoenixInputSplit());
+ }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
new file mode 100644
index 0000000..c85dc44
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Properties;
+
+public class PhoenixTestingInputPartitionReader extends PhoenixInputPartitionReader {
+
+ // A test property which is used to modify the current row returned by the test input
+ // partition reader in order to check properties passed from the driver to executors
+ public static final String RETURN_NULL_CURR_ROW = "return.null.curr.row";
+
+ PhoenixTestingInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema,
+ SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+ super(options, schema, phoenixInputSplit);
+ }
+
+ // Override to return null rather than the actual row based on a property passed to the executor
+ @Override
+ public InternalRow get() {
+ Properties props = getOverriddenPropsFromOptions();
+ return Boolean.valueOf(props.getProperty(RETURN_NULL_CURR_ROW)) ? null : super.get();
+ }
+
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
new file mode 100644
index 0000000..cbba62b
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixTestingDataSourceWriter extends PhoenixDataSourceWriter {
+
+ // Used to keep track of the total number of batches committed across all executors
+ public static int TOTAL_BATCHES_COMMITTED_COUNT = 0;
+
+ public PhoenixTestingDataSourceWriter(SaveMode mode, StructType schema,
+ DataSourceOptions options) {
+ super(mode, schema, options);
+ }
+
+ // Override to return a test DataWriterFactory
+ @Override
+ public DataWriterFactory<InternalRow> createWriterFactory() {
+ return new PhoenixTestingDataWriterFactory(getOptions());
+ }
+
+ // Override to sum up the total number of batches committed across all executors
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ for (WriterCommitMessage message : messages) {
+ TOTAL_BATCHES_COMMITTED_COUNT += Integer.parseInt(message.toString());
+ }
+ }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
new file mode 100644
index 0000000..75aa447
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+import java.sql.SQLException;
+
+public class PhoenixTestingDataWriter extends PhoenixDataWriter {
+
+ private long numBatchesCommitted = 0;
+
+ PhoenixTestingDataWriter(PhoenixDataSourceWriteOptions options) {
+ super(options);
+ }
+
+ // Override to also count the number of times we call this method to test upsert batch commits
+ @Override
+ void commitBatchUpdates() throws SQLException {
+ super.commitBatchUpdates();
+ numBatchesCommitted++;
+ }
+
+ // Override to return a test WriterCommitMessage
+ @Override
+ public WriterCommitMessage commit() {
+ super.commit();
+ return new PhoenixTestingWriterCommitMessage(numBatchesCommitted);
+ }
+
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
new file mode 100644
index 0000000..4288128
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+
+public class PhoenixTestingDataWriterFactory extends PhoenixDataWriterFactory {
+
+ PhoenixTestingDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+ super(options);
+ }
+
+ // Override to return a test DataWriter
+ @Override
+ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+ return new PhoenixTestingDataWriter(getOptions());
+ }
+}
diff --git a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
new file mode 100644
index 0000000..1e43e07
--- /dev/null
+++ b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+class PhoenixTestingWriterCommitMessage implements WriterCommitMessage {
+
+ private final long numBatchesCommitted;
+
+ PhoenixTestingWriterCommitMessage(long numBatchesCommitted) {
+ this.numBatchesCommitted = numBatchesCommitted;
+ }
+
+ // Override to keep track of the number of batches committed by the corresponding DataWriter
+ // in the WriterCommitMessage, so we can observe this value in the driver when we call
+ // {@link PhoenixTestingDataSourceWriter#commit(WriterCommitMessage[])}
+ @Override
+ public String toString() {
+ return String.valueOf(this.numBatchesCommitted);
+ }
+
+}