PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to StoreFuncInterfce (kpriceyahoo via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1903330 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ef72e6b..08fc583 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to StoreFuncInterfce (kpriceyahoo via rohini)
+
PIG-5398: SparkLauncher does not read SPARK_CONF_DIR/spark-defaults.conf (knoguchi)
PIG-5397: Update spark2.version to 2.4.8 (knoguchi)
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
index dd881ae..789fc49 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
@@ -822,4 +822,9 @@
classList.add(JSONParser.class);
return FuncUtils.getShipFiles(classList);
}
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/src/org/apache/pig/StoreFunc.java b/src/org/apache/pig/StoreFunc.java
index 079491b..7274371 100644
--- a/src/org/apache/pig/StoreFunc.java
+++ b/src/org/apache/pig/StoreFunc.java
@@ -197,47 +197,6 @@
}
}
- // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface
- /**
- * DAG execution engines like Tez support optimizing union by writing to
- * output location in parallel from tasks of different vertices. Commit is
- * called once all the vertices in the union are complete. This eliminates
- * need to have a separate phase to read data output from previous phases,
- * union them and write out again.
- *
- * Enabling the union optimization requires the OutputFormat to
- *
- * 1) Support creation of different part file names for tasks of different
- * vertices. Conflicting filenames can create data corruption and loss.
- * For eg: If task 0 of vertex1 and vertex2 both create filename as
- * part-r-00000, then one of the files will be overwritten when promoting
- * from temporary to final location leading to data loss.
- * FileOutputFormat has mapreduce.output.basename config which enables
- * naming files differently in different vertices. Classes extending
- * FileOutputFormat and those prefixing file names with mapreduce.output.basename
- * value will not encounter conflict. Cases like HBaseStorage which write to key
- * value store and do not produce files also should not face any conflict.
- *
- * 2) Support calling of commit once at the end takes care of promoting
- * temporary files of the different vertices into the final location.
- * For eg: FileOutputFormat commit algorithm handles promoting of files produced
- * by tasks of different vertices into final output location without issues
- * if there is no file name conflict. In cases like HBaseStorage, the
- * TableOutputCommitter does nothing on commit.
- *
- * If custom OutputFormat used by the StoreFunc does not support the above
- * two criteria, then false should be returned. Union optimization will be
- * disabled for the StoreFunc.
- *
- * Default implementation returns null and in that case planner falls back
- * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
- * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
- * settings to determine if the StoreFunc supports it.
- */
- public Boolean supportsParallelWriteToStoreLocation() {
- return null;
- }
-
/**
* Issue a warning. Warning messages are aggregated and reported to
* the user.
diff --git a/src/org/apache/pig/StoreFuncInterface.java b/src/org/apache/pig/StoreFuncInterface.java
index a3ae2ab..35349e8 100644
--- a/src/org/apache/pig/StoreFuncInterface.java
+++ b/src/org/apache/pig/StoreFuncInterface.java
@@ -166,4 +166,44 @@
*/
default void addCredentials(Credentials credentials, Configuration conf) {
}
+
+ /**
+ * DAG execution engines like Tez support optimizing union by writing to
+ * output location in parallel from tasks of different vertices. Commit is
+ * called once all the vertices in the union are complete. This eliminates
+ * need to have a separate phase to read data output from previous phases,
+ * union them and write out again.
+ *
+ * Enabling the union optimization requires the OutputFormat to
+ *
+ * 1) Support creation of different part file names for tasks of different
+ * vertices. Conflicting filenames can create data corruption and loss.
+ * For eg: If task 0 of vertex1 and vertex2 both create filename as
+ * part-r-00000, then one of the files will be overwritten when promoting
+ * from temporary to final location leading to data loss.
+ * FileOutputFormat has mapreduce.output.basename config which enables
+ * naming files differently in different vertices. Classes extending
+ * FileOutputFormat and those prefixing file names with mapreduce.output.basename
+ * value will not encounter conflict. Cases like HBaseStorage which write to key
+ * value store and do not produce files also should not face any conflict.
+ *
+ * 2) Support calling of commit once at the end takes care of promoting
+ * temporary files of the different vertices into the final location.
+ * For eg: FileOutputFormat commit algorithm handles promoting of files produced
+ * by tasks of different vertices into final output location without issues
+ * if there is no file name conflict. In cases like HBaseStorage, the
+ * TableOutputCommitter does nothing on commit.
+ *
+ * If custom OutputFormat used by the StoreFunc does not support the above
+ * two criteria, then false should be returned. Union optimization will be
+ * disabled for the StoreFunc.
+ *
+ * Default implementation returns null and in that case planner falls back
+ * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
+ * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
+ * settings to determine if the StoreFunc supports it.
+ */
+ default Boolean supportsParallelWriteToStoreLocation() {
+ return null;
+ }
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
index 6ea5a8b..b18d5df 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
@@ -83,21 +83,9 @@
private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
private TezOperPlan tezPlan;
- private static Set<String> builtinSupportedStoreFuncs = new HashSet<String>();
private List<String> supportedStoreFuncs;
private List<String> unsupportedStoreFuncs;
- static {
- builtinSupportedStoreFuncs.add(PigStorage.class.getName());
- builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
- builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
- builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
- builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
- builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
- builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
- builtinSupportedStoreFuncs.add(Storage.class.getName());
- }
-
public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) {
super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
tezPlan = plan;
@@ -129,42 +117,51 @@
throws VisitorException {
List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+ // If any store function does not support parallel writes, then we cannot use this optimization
for (POStoreTez store : stores) {
String name = store.getStoreFunc().getClass().getName();
- if (store.getStoreFunc() instanceof StoreFunc) {
- StoreFunc func = (StoreFunc) store.getStoreFunc();
- if (func.supportsParallelWriteToStoreLocation() != null) {
- if (func.supportsParallelWriteToStoreLocation()) {
- continue;
- } else {
- LOG.warn(name + " does not support union optimization."
- + " Disabling it. There will be some performance degradation.");
- return false;
- }
- }
+ Boolean supportsParallelWriteToStoreLocation = store.getStoreFunc().supportsParallelWriteToStoreLocation();
+
+ // We process exclusions first, then inclusions. This way, a user can explicitly disable parallel stores
+ // for a UDF that claims to support it, but cannot enable parallel stores for a UDF that claims not to.
+ //
+ // Logical flow:
+ // 1) If the store function is explicitly listed as unsupported, then return false
+ // 2) If the store function specifies itself as unsupported, then return false
+ // 3) If the store function specifies itself as supported, then continue (true case)
+ // 4) If the store function is explicitly listed as support, then continue (true case)
+ // 5) Otherwise, return false
+
+ if (unsupportedStoreFuncs != null && unsupportedStoreFuncs.contains(name)) {
+ LOG.warn(name + " does not support union optimization."
+ + " Disabling it. There will be some performance degradation.");
+ return false;
}
- // If StoreFunc does not explicitly state support, then check supported and
- // unsupported config settings.
- if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
- if (unsupportedStoreFuncs != null
- && unsupportedStoreFuncs.contains(name)) {
+
+ if (supportsParallelWriteToStoreLocation != null) {
+ if (supportsParallelWriteToStoreLocation) {
+ continue;
+ } else {
+ LOG.warn(name + " does not support union optimization."
+ + " Disabling it. There will be some performance degradation.");
return false;
}
- if (supportedStoreFuncs != null
- && !supportedStoreFuncs.contains(name)) {
- if (!builtinSupportedStoreFuncs.contains(name)) {
- LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
- + " does not contain " + name
- + " and so disabling union optimization. There will be some performance degradation. "
- + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location,"
- + " run pig with -D"
- + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
- + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691");
- return false;
- }
- }
}
+
+ if (supportedStoreFuncs != null && supportedStoreFuncs.contains(name)) {
+ continue;
+ }
+
+ LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+ + " does not contain " + name
+ + " and so disabling union optimization. There will be some performance degradation. "
+ + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location,"
+ + " run pig with -D"
+ + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+ + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691");
+ return false;
}
+
return true;
}
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 1953253..8b165d5 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -1337,4 +1337,9 @@
}
return incremented;
}
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/src/org/apache/pig/builtin/AvroStorage.java b/src/org/apache/pig/builtin/AvroStorage.java
index 86f764c..0acbe6e 100644
--- a/src/org/apache/pig/builtin/AvroStorage.java
+++ b/src/org/apache/pig/builtin/AvroStorage.java
@@ -717,4 +717,9 @@
Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
return FuncUtils.getShipFiles(classList);
}
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/src/org/apache/pig/builtin/JsonStorage.java b/src/org/apache/pig/builtin/JsonStorage.java
index c8c764d..0eb1e66 100644
--- a/src/org/apache/pig/builtin/JsonStorage.java
+++ b/src/org/apache/pig/builtin/JsonStorage.java
@@ -319,4 +319,9 @@
public List<String> getCacheFiles() {
return null;
}
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 9db01f4..0af4d25 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -713,4 +713,9 @@
}
return values;
}
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/src/org/apache/pig/builtin/PigStorage.java b/src/org/apache/pig/builtin/PigStorage.java
index dc049f1..c93c273 100644
--- a/src/org/apache/pig/builtin/PigStorage.java
+++ b/src/org/apache/pig/builtin/PigStorage.java
@@ -619,4 +619,9 @@
mLog.warn("Could not delete output " + output);
}
}
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/src/org/apache/pig/builtin/mock/Storage.java b/src/org/apache/pig/builtin/mock/Storage.java
index 84bbc74..7c1ec4c 100644
--- a/src/org/apache/pig/builtin/mock/Storage.java
+++ b/src/org/apache/pig/builtin/mock/Storage.java
@@ -693,4 +693,8 @@
}
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
}
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
index 3545290..fee7c03 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
@@ -40,6 +40,6 @@
|---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
Tez vertex scope-20
# Plan on vertex
-c: Store(file:///tmp/pigoutput:org.apache.pig.test.TestMultiQueryBasic$DummyStoreWithOutputFormat) - scope-17
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc) - scope-17
|
|---POShuffledValueInputTez - scope-21 <- [scope-18, scope-19]
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
index fee7c03..f415c63 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
@@ -40,6 +40,6 @@
|---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
Tez vertex scope-20
# Plan on vertex
-c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc) - scope-17
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteDisabled) - scope-17
|
|---POShuffledValueInputTez - scope-21 <- [scope-18, scope-19]
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
new file mode 100644
index 0000000..c4efe90
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
@@ -0,0 +1,42 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-18 -> Tez vertex group scope-24,
+Tez vertex scope-19 -> Tez vertex group scope-24,
+Tez vertex group scope-24
+
+Tez vertex scope-18
+# Plan on vertex
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled) - scope-25 -> scope-17
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled) - scope-26 -> scope-17
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][1] - scope-9
+ | |
+ | Cast[chararray] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ |
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-24 <- [scope-18, scope-19] -> null
+# No plan on vertex group
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index d1617e3..f481f83 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -47,7 +47,6 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
import org.apache.pig.test.Util;
import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader;
import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader;
@@ -942,10 +941,11 @@
"store c into 'file:///tmp/pigoutput';";
// Union optimization should be turned off if PARALLEL clause is specified
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+
}
@Test
- public void testUnionUnSupportedStore() throws Exception {
+ public void testUnionIncludeExcludeStoreFunc() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -966,7 +966,7 @@
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
- "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();";
+ "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
// Plan should not have union optimization applied as only ORC is supported
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
@@ -976,11 +976,24 @@
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
- "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
+ "store c into 'file:///tmp/pigoutput' using " +
+ TestDummyStoreFuncParallelWriteDisabled.class.getName() + "();";
// Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld");
resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
+ query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+ "c = union onschema a, b;" +
+ "store c into 'file:///tmp/pigoutput' using " +
+ TestDummyStoreFuncParallelWriteEnabled.class.getName() + "();";
+
+ // Plan should have union optimization applied as supportsParallelWriteToStoreLocation returns true
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld");
+
+ resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
query =
@@ -1461,7 +1474,6 @@
}
public static class TestDummyStoreFunc extends StoreFunc {
-
@Override
public OutputFormat getOutputFormat() throws IOException {
return null;
@@ -1479,12 +1491,22 @@
@Override
public void putNext(Tuple t) throws IOException {
}
+ }
+ public static class TestDummyStoreFuncParallelWriteEnabled extends TestDummyStoreFunc {
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return true;
+ }
+ }
+
+
+ public static class TestDummyStoreFuncParallelWriteDisabled extends TestDummyStoreFunc {
@Override
public Boolean supportsParallelWriteToStoreLocation() {
return false;
}
-
}
+
}