PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1847856 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d240afa..fc51596 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -88,6 +88,8 @@
BUG FIXES
+PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
+
PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini)
PIG-5355: Negative progress report by HBaseTableRecordReader (satishsaley via knoguchi)
diff --git a/src/org/apache/pig/newplan/logical/relational/LOUnion.java b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
index fbd42ed..137f038 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOUnion.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
@@ -34,9 +34,11 @@
public class LOUnion extends LogicalRelationalOperator {
private boolean onSchema;
+
+ private static String UID_SEPARATOR = "_";
// uid mapping from output uid to input uid
- private List<Pair<Long, Long>> uidMapping = new ArrayList<Pair<Long, Long>>();
+ private List<Pair<Long, String>> uidMapping = new ArrayList<Pair<Long, String>>();
public LOUnion(OperatorPlan plan) {
super("LOUnion", plan);
@@ -108,7 +110,7 @@
}
// Bring back cached uid if any; otherwise, cache uid generated
- setMergedSchemaUids(mergedSchema, inputSchemas);
+ setMergedSchemaUids(mergedSchema, inputSchemas, "");
return schema = mergedSchema;
}
@@ -145,7 +147,7 @@
return mergedSchema;
}
- private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas)
+ private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas, String nested_uids)
throws FrontendException {
for (int i=0;i<mergedSchema.size();i++) {
@@ -170,7 +172,7 @@
}
if (uid < 0) {
- uid = getCachedOuputUid(inputFieldSchema.uid);
+ uid = getCachedOuputUid(createNestedUids(nested_uids,inputFieldSchema.uid));
if (uid >= 0 && outputFieldSchema.schema == null) break;
}
}
@@ -186,13 +188,13 @@
matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
if (matchedInputFieldSchema!=null) {
inputUid = matchedInputFieldSchema.uid;
- uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+ uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid)));
}
}
else {
matchedInputFieldSchema = mergedSchema.getField(i);
inputUid = inputSchema.getField(i).uid;
- uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+ uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid)));
}
}
}
@@ -201,16 +203,28 @@
// This field has a schema. Assign uids to it as well
if (outputFieldSchema.schema != null) {
- setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas);
+ setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas, createNestedUids(nested_uids,outputFieldSchema.uid));
}
}
}
- private long getCachedOuputUid(long inputUid) {
+ private String createNestedUids(String nested_uids, long new_uid) {
+ StringBuilder sb = new StringBuilder(nested_uids);
+ sb.append(UID_SEPARATOR);
+ sb.append(new_uid);
+ return sb.toString();
+ }
+
+ private long getLeafUid(String nested_uids) {
+ String [] uid_root_to_leaf = nested_uids.split(UID_SEPARATOR);
+ return Long.valueOf(uid_root_to_leaf[uid_root_to_leaf.length-1]);
+ }
+
+ private long getCachedOuputUid(String nested_input_uids) {
long uid = -1;
- for (Pair<Long, Long> pair : uidMapping) {
- if (pair.second==inputUid) {
+ for (Pair<Long, String> pair : uidMapping) {
+ if (pair.second.equals(nested_input_uids)) {
uid = pair.first;
break;
}
@@ -237,18 +251,18 @@
}
// Get input uids mapping to the output uid
- public Set<Long> getInputUids(long uid) {
+ public Set<Long> getInputUids(long outputuid) {
Set<Long> result = new HashSet<Long>();
- for (Pair<Long, Long> pair : uidMapping) {
- if (pair.first==uid)
- result.add(pair.second);
+ for (Pair<Long, String> pair : uidMapping) {
+ if (pair.first==outputuid)
+ result.add(getLeafUid(pair.second));
}
return result;
}
@Override
public void resetUid() {
- uidMapping = new ArrayList<Pair<Long, Long>>();
+ uidMapping = new ArrayList<Pair<Long, String>>();
}
public List<Operator> getInputs() {
diff --git a/test/org/apache/pig/test/TestNewPlanColumnPrune.java b/test/org/apache/pig/test/TestNewPlanColumnPrune.java
index 67a02af..8193bf6 100644
--- a/test/org/apache/pig/test/TestNewPlanColumnPrune.java
+++ b/test/org/apache/pig/test/TestNewPlanColumnPrune.java
@@ -482,6 +482,35 @@
}
}
+ @Test
+ public void testUnionOnschemaWithInnerBag() throws Exception {
+ // After handing inner-bag in Union-onschema,
+ // ColumnPrune broke due to overlapping uid inside the relation and
+ // ones inside the inner-bag (PIG-5370)
+ String query = "A0 = load 'd.txt' as (a0:int, a1:int, a2:int, a3:int);" +
+ "A = FOREACH A0 GENERATE a0, a1, a2;" +
+ "B = FOREACH (GROUP A by (a0,a1)) {" +
+ " A_FOREACH = FOREACH A GENERATE a1,a2;" +
+ " GENERATE A, FLATTEN(A_FOREACH) as (a1,a2);" +
+ "}" +
+ "C = load 'd2.txt' as (A:bag{tuple:(a0:int, a1:int, a2:int)}, a1:int,a2:int);" +
+ "Z = UNION ONSCHEMA B, C;" +
+ "store Z into 'empty';";
+
+ LogicalPlan newLogicalPlan = buildPlan(query);
+
+ PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+ System.err.println(newLogicalPlan);
+ Iterator<Operator> iter = newLogicalPlan.getOperators();
+ while (iter.hasNext()) {
+ Operator o = iter.next();
+ LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
+ if (lro == null || lro.getAlias() == null) continue;
+ assertNotNull(lro.getSchema());
+ }
+ }
+
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
@@ -505,4 +534,3 @@
}
}
}
-