PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to StoreFuncInterfce (kpriceyahoo via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1903330 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ef72e6b..08fc583 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to StoreFuncInterfce (kpriceyahoo via rohini)
+
 PIG-5398: SparkLauncher does not read SPARK_CONF_DIR/spark-defaults.conf (knoguchi)
 
 PIG-5397: Update spark2.version to 2.4.8 (knoguchi)
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
index dd881ae..789fc49 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
@@ -822,4 +822,9 @@
         classList.add(JSONParser.class);
         return FuncUtils.getShipFiles(classList);
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }
diff --git a/src/org/apache/pig/StoreFunc.java b/src/org/apache/pig/StoreFunc.java
index 079491b..7274371 100644
--- a/src/org/apache/pig/StoreFunc.java
+++ b/src/org/apache/pig/StoreFunc.java
@@ -197,47 +197,6 @@
         }
     }
 
-    // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface
-    /**
-     * DAG execution engines like Tez support optimizing union by writing to
-     * output location in parallel from tasks of different vertices. Commit is
-     * called once all the vertices in the union are complete. This eliminates
-     * need to have a separate phase to read data output from previous phases,
-     * union them and write out again.
-     *
-     * Enabling the union optimization requires the OutputFormat to
-     *
-     *      1) Support creation of different part file names for tasks of different
-     * vertices. Conflicting filenames can create data corruption and loss.
-     * For eg: If task 0 of vertex1 and vertex2 both create filename as
-     * part-r-00000, then one of the files will be overwritten when promoting
-     * from temporary to final location leading to data loss.
-     *      FileOutputFormat has mapreduce.output.basename config which enables
-     *  naming files differently in different vertices. Classes extending
-     *  FileOutputFormat and those prefixing file names with mapreduce.output.basename
-     *  value will not encounter conflict. Cases like HBaseStorage which write to key
-     *  value store and do not produce files also should not face any conflict.
-     *
-     *      2) Support calling of commit once at the end takes care of promoting
-     * temporary files of the different vertices into the final location.
-     * For eg: FileOutputFormat commit algorithm handles promoting of files produced
-     * by tasks of different vertices into final output location without issues
-     * if there is no file name conflict. In cases like HBaseStorage, the
-     * TableOutputCommitter does nothing on commit.
-     *
-     * If custom OutputFormat used by the StoreFunc does not support the above
-     * two criteria, then false should be returned. Union optimization will be
-     * disabled for the StoreFunc.
-     *
-     * Default implementation returns null and in that case planner falls back
-     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
-     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
-     * settings to determine if the StoreFunc supports it.
-     */
-    public Boolean supportsParallelWriteToStoreLocation() {
-        return null;
-    }
-
     /**
      * Issue a warning.  Warning messages are aggregated and reported to
      * the user.
diff --git a/src/org/apache/pig/StoreFuncInterface.java b/src/org/apache/pig/StoreFuncInterface.java
index a3ae2ab..35349e8 100644
--- a/src/org/apache/pig/StoreFuncInterface.java
+++ b/src/org/apache/pig/StoreFuncInterface.java
@@ -166,4 +166,44 @@
      */
     default void addCredentials(Credentials credentials, Configuration conf) {
     }
+
+    /**
+     * DAG execution engines like Tez support optimizing union by writing to
+     * output location in parallel from tasks of different vertices. Commit is
+     * called once all the vertices in the union are complete. This eliminates
+     * need to have a separate phase to read data output from previous phases,
+     * union them and write out again.
+     *
+     * Enabling the union optimization requires the OutputFormat to
+     *
+     *      1) Support creation of different part file names for tasks of different
+     * vertices. Conflicting filenames can create data corruption and loss.
+     * For eg: If task 0 of vertex1 and vertex2 both create filename as
+     * part-r-00000, then one of the files will be overwritten when promoting
+     * from temporary to final location leading to data loss.
+     *      FileOutputFormat has mapreduce.output.basename config which enables
+     *  naming files differently in different vertices. Classes extending
+     *  FileOutputFormat and those prefixing file names with mapreduce.output.basename
+     *  value will not encounter conflict. Cases like HBaseStorage which write to key
+     *  value store and do not produce files also should not face any conflict.
+     *
+     *      2) Support calling of commit once at the end takes care of promoting
+     * temporary files of the different vertices into the final location.
+     * For eg: FileOutputFormat commit algorithm handles promoting of files produced
+     * by tasks of different vertices into final output location without issues
+     * if there is no file name conflict. In cases like HBaseStorage, the
+     * TableOutputCommitter does nothing on commit.
+     *
+     * If custom OutputFormat used by the StoreFunc does not support the above
+     * two criteria, then false should be returned. Union optimization will be
+     * disabled for the StoreFunc.
+     *
+     * Default implementation returns null and in that case planner falls back
+     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
+     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
+     * settings to determine if the StoreFunc supports it.
+     */
+    default Boolean supportsParallelWriteToStoreLocation() {
+        return null;
+    }
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
index 6ea5a8b..b18d5df 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
@@ -83,21 +83,9 @@
 
     private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
     private TezOperPlan tezPlan;
-    private static Set<String> builtinSupportedStoreFuncs = new HashSet<String>();
     private List<String> supportedStoreFuncs;
     private List<String> unsupportedStoreFuncs;
 
-    static {
-        builtinSupportedStoreFuncs.add(PigStorage.class.getName());
-        builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
-        builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
-        builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
-        builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
-        builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
-        builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
-        builtinSupportedStoreFuncs.add(Storage.class.getName());
-    }
-
     public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         tezPlan = plan;
@@ -129,42 +117,51 @@
             throws VisitorException {
         List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
 
+        // If any store function does not support parallel writes, then we cannot use this optimization
         for (POStoreTez store : stores) {
             String name = store.getStoreFunc().getClass().getName();
-            if (store.getStoreFunc() instanceof StoreFunc) {
-                StoreFunc func = (StoreFunc) store.getStoreFunc();
-                if (func.supportsParallelWriteToStoreLocation() != null) {
-                    if (func.supportsParallelWriteToStoreLocation()) {
-                        continue;
-                    } else {
-                        LOG.warn(name + " does not support union optimization."
-                                + " Disabling it. There will be some performance degradation.");
-                        return false;
-                    }
-                }
+            Boolean supportsParallelWriteToStoreLocation = store.getStoreFunc().supportsParallelWriteToStoreLocation();
+
+            // We process exclusions first, then inclusions. This way, a user can explicitly disable parallel stores
+            // for a UDF that claims to support it, but cannot enable parallel stores for a UDF that claims not to.
+            //
+            // Logical flow:
+            // 1) If the store function is explicitly listed as unsupported, then return false
+            // 2) If the store function specifies itself as unsupported, then return false
+            // 3) If the store function specifies itself as supported, then continue (true case)
+            // 4) If the store function is explicitly listed as support, then continue (true case)
+            // 5) Otherwise, return false
+
+            if (unsupportedStoreFuncs != null && unsupportedStoreFuncs.contains(name)) {
+                LOG.warn(name + " does not support union optimization."
+                         + " Disabling it. There will be some performance degradation.");
+                return false;
             }
-            // If StoreFunc does not explicitly state support, then check supported and
-            // unsupported config settings.
-            if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
-                if (unsupportedStoreFuncs != null
-                        && unsupportedStoreFuncs.contains(name)) {
+
+            if (supportsParallelWriteToStoreLocation != null) {
+                if (supportsParallelWriteToStoreLocation) {
+                    continue;
+                } else {
+                    LOG.warn(name + " does not support union optimization."
+                             + " Disabling it. There will be some performance degradation.");
                     return false;
                 }
-                if (supportedStoreFuncs != null
-                        && !supportedStoreFuncs.contains(name)) {
-                    if (!builtinSupportedStoreFuncs.contains(name)) {
-                        LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
-                                + " does not contain " + name
-                                + " and so disabling union optimization. There will be some performance degradation. "
-                                + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location,"
-                                + " run pig with -D"
-                                + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
-                                + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691");
-                        return false;
-                    }
-                }
             }
+
+            if (supportedStoreFuncs != null && supportedStoreFuncs.contains(name)) {
+                continue;
+            }
+
+            LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                     + " does not contain " + name
+                     + " and so disabling union optimization. There will be some performance degradation. "
+                     + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location,"
+                     + " run pig with -D"
+                     + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                     + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691");
+            return false;
         }
+
         return true;
     }
 
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 1953253..8b165d5 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -1337,4 +1337,9 @@
         }
         return incremented;
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }
diff --git a/src/org/apache/pig/builtin/AvroStorage.java b/src/org/apache/pig/builtin/AvroStorage.java
index 86f764c..0acbe6e 100644
--- a/src/org/apache/pig/builtin/AvroStorage.java
+++ b/src/org/apache/pig/builtin/AvroStorage.java
@@ -717,4 +717,9 @@
       Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
       return FuncUtils.getShipFiles(classList);
   }
+
+  @Override
+  public Boolean supportsParallelWriteToStoreLocation() {
+    return true;
+  }
 }
diff --git a/src/org/apache/pig/builtin/JsonStorage.java b/src/org/apache/pig/builtin/JsonStorage.java
index c8c764d..0eb1e66 100644
--- a/src/org/apache/pig/builtin/JsonStorage.java
+++ b/src/org/apache/pig/builtin/JsonStorage.java
@@ -319,4 +319,9 @@
     public List<String> getCacheFiles() {
         return null;
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 9db01f4..0af4d25 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -713,4 +713,9 @@
         }
         return values;
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }
diff --git a/src/org/apache/pig/builtin/PigStorage.java b/src/org/apache/pig/builtin/PigStorage.java
index dc049f1..c93c273 100644
--- a/src/org/apache/pig/builtin/PigStorage.java
+++ b/src/org/apache/pig/builtin/PigStorage.java
@@ -619,4 +619,9 @@
             mLog.warn("Could not delete output " + output);
         }
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }
diff --git a/src/org/apache/pig/builtin/mock/Storage.java b/src/org/apache/pig/builtin/mock/Storage.java
index 84bbc74..7c1ec4c 100644
--- a/src/org/apache/pig/builtin/mock/Storage.java
+++ b/src/org/apache/pig/builtin/mock/Storage.java
@@ -693,4 +693,8 @@
 
   }
 
+  @Override
+  public Boolean supportsParallelWriteToStoreLocation() {
+    return true;
+  }
 }
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
index 3545290..fee7c03 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
@@ -40,6 +40,6 @@
     |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
 Tez vertex scope-20
 # Plan on vertex
-c: Store(file:///tmp/pigoutput:org.apache.pig.test.TestMultiQueryBasic$DummyStoreWithOutputFormat) - scope-17
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc) - scope-17
 |
 |---POShuffledValueInputTez - scope-21	<-	 [scope-18, scope-19]
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
index fee7c03..f415c63 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
@@ -40,6 +40,6 @@
     |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
 Tez vertex scope-20
 # Plan on vertex
-c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc) - scope-17
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteDisabled) - scope-17
 |
 |---POShuffledValueInputTez - scope-21	<-	 [scope-18, scope-19]
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
new file mode 100644
index 0000000..c4efe90
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
@@ -0,0 +1,42 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-18	->	Tez vertex group scope-24,
+Tez vertex scope-19	->	Tez vertex group scope-24,
+Tez vertex group scope-24
+
+Tez vertex scope-18
+# Plan on vertex
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled) - scope-25	->	 scope-17
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled) - scope-26	->	 scope-17
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][1] - scope-9
+    |   |
+    |   Cast[chararray] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-24	<-	 [scope-18, scope-19]	->	 null
+# No plan on vertex group
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index d1617e3..f481f83 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -47,7 +47,6 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader;
 import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader;
@@ -942,10 +941,11 @@
                 "store c into 'file:///tmp/pigoutput';";
         // Union optimization should be turned off if PARALLEL clause is specified
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+
     }
 
     @Test
-    public void testUnionUnSupportedStore() throws Exception {
+    public void testUnionIncludeExcludeStoreFunc() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -966,7 +966,7 @@
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();";
+                "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
         // Plan should not have union optimization applied as only ORC is supported
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
 
@@ -976,11 +976,24 @@
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
+                "store c into 'file:///tmp/pigoutput' using " +
+                TestDummyStoreFuncParallelWriteDisabled.class.getName() + "();";
         // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld");
 
         resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
+        query =
+            "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+            "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+            "c = union onschema a, b;" +
+            "store c into 'file:///tmp/pigoutput' using " +
+            TestDummyStoreFuncParallelWriteEnabled.class.getName() + "();";
+
+        // Plan should have union optimization applied as supportsParallelWriteToStoreLocation returns true
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld");
+
+        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
         query =
@@ -1461,7 +1474,6 @@
     }
 
     public static class TestDummyStoreFunc extends StoreFunc {
-
         @Override
         public OutputFormat getOutputFormat() throws IOException {
             return null;
@@ -1479,12 +1491,22 @@
         @Override
         public void putNext(Tuple t) throws IOException {
         }
+    }
 
+    public static class TestDummyStoreFuncParallelWriteEnabled extends TestDummyStoreFunc {
+        @Override
+        public Boolean supportsParallelWriteToStoreLocation() {
+            return true;
+        }
+    }
+
+
+    public static class TestDummyStoreFuncParallelWriteDisabled extends TestDummyStoreFunc {
         @Override
         public Boolean supportsParallelWriteToStoreLocation() {
             return false;
         }
-
     }
+
 }