PIG-5277: Spark mode is writing nulls among tuples to the output (workaround) (szita)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1804929 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 10527d2..0d64ad9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,8 @@
BUG FIXES
+PIG-5277: Spark mode is writing nulls among tuples to the output (workaround) (szita)
+
PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)
PIG-5284: Fix flakyness introduced by PIG-3655 (szita)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
index 1acbd36..172e309 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
@@ -37,6 +37,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.NonWritableTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -141,7 +142,9 @@
public void write(WritableComparable key, Tuple value)
throws IOException, InterruptedException {
if(mode == Mode.SINGLE_STORE) {
- storeDecorator.putNext(value);
+ if (!(value instanceof NonWritableTuple)) {
+ storeDecorator.putNext(value);
+ }
} else {
throw new IOException("Internal Error: Unexpected code path");
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
index 3c037b4..c58bec2 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
@@ -34,6 +34,7 @@
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.data.NonWritableTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -209,7 +210,7 @@
out = (Tuple) result.result;
break;
case POStatus.STATUS_NULL:
- out = null;
+ out = NonWritableTuple.INSTANCE;
break;
default:
throw new RuntimeException(
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
index 85b2d1e..910eb26 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
@@ -20,6 +20,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.NonWritableTuple;
import org.apache.pig.data.Tuple;
abstract class OutputConsumerIterator implements java.util.Iterator<Tuple> {
@@ -59,6 +60,9 @@
return;
}
Tuple v1 = input.next();
+ if (v1 instanceof NonWritableTuple) {
+ v1 = null;
+ }
attach(v1);
}
diff --git a/src/org/apache/pig/data/NonWritableTuple.java b/src/org/apache/pig/data/NonWritableTuple.java
new file mode 100644
index 0000000..63bee32
--- /dev/null
+++ b/src/org/apache/pig/data/NonWritableTuple.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * A singleton Tuple type which is not picked up for writing by PigRecordWriter
+ */
+public class NonWritableTuple extends AbstractTuple {
+
+
+ public static final NonWritableTuple INSTANCE = new NonWritableTuple();
+
+ private NonWritableTuple(){}
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public Object get(int fieldNum) throws ExecException {
+ return null;
+ }
+
+ @Override
+ public List<Object> getAll() {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public void set(int fieldNum, Object val) throws ExecException {
+ throw new ExecException("Unimplemented");
+ }
+
+ @Override
+ public void append(Object val) {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public long getMemorySize() {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ throw new IOException("Unimplemented");
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ throw new IOException("Unimplemented");
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ throw new RuntimeException("Unimplemented");
+ }
+
+}