PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1745276 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fbdb902..aa319d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@
 
 IMPROVEMENTS
 
+PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)
+
 PIG-4894: Add API for StoreFunc to specify if they are write safe from two different vertices (rohini)
 
 PIG-4884: Tez needs to use DistinctCombiner.Combine (rohini)
diff --git a/src/docs/src/documentation/content/xdocs/basic.xml b/src/docs/src/documentation/content/xdocs/basic.xml
index f88a53d..e38b6e6 100644
--- a/src/docs/src/documentation/content/xdocs/basic.xml
+++ b/src/docs/src/documentation/content/xdocs/basic.xml
@@ -8407,7 +8407,7 @@
    <table>
       <tr> 
             <td>
-               <p>alias = UNION [ONSCHEMA] alias, alias [, alias …];</p>
+               <p>alias = UNION [ONSCHEMA] alias, alias [, alias …] [PARALLEL n];</p>
             </td>
          </tr> 
    </table></section>
@@ -8434,6 +8434,16 @@
                All inputs to the union must have a non-unknown (non-null) <a href="#schemas">schema</a>.</p>
             </td>
          </tr>
+         
+     <tr>
+        <td>
+           <p>PARALLEL n</p>
+        </td>
+        <td>
+           <p>This is only applicable for Tez execution mode and will not work with Mapreduce mode. Specifying PARALLEL will introduce an extra reduce step that will slightly degrade performance. The primary purpose in this case is to control the number of output files.</p>
+           <p>For more information, see <a href="perf.html#parallel">Use the Parallel Features</a>.</p>
+        </td>
+     </tr>
    </table>
    </section>
    
diff --git a/src/docs/src/documentation/content/xdocs/perf.xml b/src/docs/src/documentation/content/xdocs/perf.xml
index 262faaf..681c9fc 100644
--- a/src/docs/src/documentation/content/xdocs/perf.xml
+++ b/src/docs/src/documentation/content/xdocs/perf.xml
@@ -957,6 +957,9 @@
 <a href="basic.html#join-inner">JOIN (inner)</a>, 
 <a href="basic.html#join-outer">JOIN (outer)</a>, and
 <a href="basic.html#order-by">ORDER BY</a>.
+  PARALLEL clause can also be used with <a href="basic.html#union">UNION</a> if Tez is the execution mode.
+  It will turn off the union optimization and introduce an extra reduce step.
+  Though it will have slightly degraded performance due to the extra step, it is very useful for controlling the number of output files.
 </p>
 
 <p>The number of reducers you need for a particular construct in Pig that forms a MapReduce boundary depends entirely on (1) your data and the number of intermediate keys you are generating in your mappers and (2) the partitioner and distribution of map (combiner) output keys. In the best cases we have seen that a reducer processing about 1 GB of data behaves efficiently.</p>
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
index 5a72f50..fa29364 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
@@ -109,6 +109,12 @@
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
             return false;
         }
+
+        // If user has specified a PARALLEL clause with the union operator
+        // turn off union optimization
+        if (tezOp.getRequestedParallelism() != -1) {
+            return false;
+        }
         // Two vertices separately ranking with 1 to n and writing to output directly
         // will make each rank repeate twice which is wrong. Rank always needs to be
         // done from single vertex to have the counting correct.
diff --git a/src/org/apache/pig/builtin/RoundRobinPartitioner.java b/src/org/apache/pig/builtin/RoundRobinPartitioner.java
index 7142df6..b620554 100644
--- a/src/org/apache/pig/builtin/RoundRobinPartitioner.java
+++ b/src/org/apache/pig/builtin/RoundRobinPartitioner.java
@@ -17,15 +17,52 @@
  */
 package org.apache.pig.builtin;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
-    private int num = 0;
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
+        implements Configurable {
+
+    /**
+     * Batch size for round robin partitioning. Batch size number of records
+     * will be distributed to each partition in a round robin fashion. Default
+     * value is 0 which distributes each record in a circular fashion. Higher
+     * number for batch size can be used to increase probability of keeping
+     * similar records in the same partition if output is already sorted and get
+     * better compression.
+     */
+    public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size";
+    private int num = -1;
+    private int batchSize = 0;
+    private int currentBatchCount = 0;
+    private Configuration conf;
 
     @Override
     public int getPartition(Writable key, Writable value, int numPartitions) {
-        num = ++num % numPartitions;
+        if (batchSize > 0) {
+            if (currentBatchCount == 0) {
+                num = ++num % numPartitions;
+            }
+            if (++currentBatchCount == batchSize) {
+                currentBatchCount = 0;
+            }
+        } else {
+            num = ++num % numPartitions;
+        }
         return num;
     }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
 }
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index d92a08a..b238975 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -644,6 +644,15 @@
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b PARALLEL 15;" +
+                "store c into 'file:///tmp/pigoutput';";
+        // Union optimization should be turned off if PARALLEL clause is specified
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
     @Test
diff --git a/test/org/apache/pig/tez/TestTezJobExecution.java b/test/org/apache/pig/tez/TestTezJobExecution.java
new file mode 100644
index 0000000..c98ec81
--- /dev/null
+++ b/test/org/apache/pig/tez/TestTezJobExecution.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.test.Util;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test class for tez specific behaviour tests
+ */
+public class TestTezJobExecution {
+
+    private static final String TEST_DIR = Util.getTestDirectory(TestTezJobExecution.class);
+
+    private PigServer pigServer;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        Util.deleteDirectory(new File(TEST_DIR));
+        new File(TEST_DIR).mkdirs();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        Util.deleteDirectory(new File(TEST_DIR));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(Util.getLocalTestMode());
+    }
+
+    @Test
+    public void testUnionParallelRoundRobinBatchSize() throws IOException {
+        String input = TEST_DIR + Path.SEPARATOR + "input1";
+        String output = TEST_DIR + Path.SEPARATOR + "output1";
+        Util.createInputFile(pigServer.getPigContext(), input, new String[] {
+            "1", "1", "1", "2", "2", "2"
+        });
+        String query = "A = LOAD '" + input + "';"
+                + "B = LOAD '" + input + "';"
+                + "C = UNION A, B PARALLEL 2;"
+                + "STORE C into '" + output + "';";
+        pigServer.getPigContext().getProperties().setProperty(
+                RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, "3");
+        pigServer.registerQuery(query);
+        String part0 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00000"));
+        String part1 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00001"));
+        assertEquals("1\n1\n1\n1\n1\n1\n", part0);
+        assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+    }
+
+}