PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1819344 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 694a637..a443926 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@
OPTIMIZATIONS
BUG FIXES
+PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
+
PIG-5300: hashCode for Bag needs to be order independent (knoguchi)
PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
diff --git a/src/org/apache/pig/newplan/logical/relational/LOUnion.java b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
index b919fc5..fbd42ed 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOUnion.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
@@ -108,60 +108,16 @@
}
// Bring back cached uid if any; otherwise, cache uid generated
- for (int i=0;i<mergedSchema.size();i++)
- {
- LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i);
+ setMergedSchemaUids(mergedSchema, inputSchemas);
- long uid = -1;
-
- // Search all the cached uid mappings by input field to see if
- // we've cached an output uid for this output field
- for (LogicalSchema inputSchema : inputSchemas) {
- LogicalSchema.LogicalFieldSchema inputFieldSchema;
- if (onSchema) {
- inputFieldSchema = inputSchema.getFieldSubNameMatch(outputFieldSchema.alias);
- } else {
- inputFieldSchema = inputSchema.getField(i);
- }
-
- if (inputFieldSchema != null) {
- uid = getCachedOuputUid(inputFieldSchema.uid);
- if (uid >= 0) break;
- }
- }
-
- // No cached uid. Allocate one, and locate and cache all inputs.
- if (uid==-1) {
- uid = LogicalExpression.getNextUid();
- for (LogicalSchema inputSchema : inputSchemas) {
- long inputUid;
- LogicalFieldSchema matchedInputFieldSchema;
- if (onSchema) {
- matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
- if (matchedInputFieldSchema!=null) {
- inputUid = matchedInputFieldSchema.uid;
- uidMapping.add(new Pair<Long, Long>(uid, inputUid));
- }
- }
- else {
- matchedInputFieldSchema = mergedSchema.getField(i);
- inputUid = inputSchema.getField(i).uid;
- uidMapping.add(new Pair<Long, Long>(uid, inputUid));
- }
- }
- }
-
- outputFieldSchema.uid = uid;
- }
-
return schema = mergedSchema;
}
/**
* create schema for union-onschema
*/
- private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas,
- List<String> inputAliases)
+ private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas,
+ List<String> inputAliases)
throws FrontendException {
ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
for (int i = 0; i < inputSchemas.size(); i++){
@@ -175,20 +131,81 @@
}
schemas.add( sch );
}
-
+
//create the merged schema
LogicalSchema mergedSchema = null;
try {
- mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
+ mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
} catch(FrontendException e) {
String msg = "Error merging schemas for union operator : "
+ e.getMessage();
throw new FrontendException(this, msg, 1116, PigException.INPUT, e);
}
-
+
return mergedSchema;
}
-
+
+ private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas)
+ throws FrontendException {
+
+ for (int i=0;i<mergedSchema.size();i++) {
+ LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i);
+
+ long uid = -1;
+ List<LogicalSchema> fieldInputSchemas = new ArrayList<>(inputSchemas.size());
+
+ // Search all the cached uid mappings by input field to see if
+ // we've cached an output uid for this output field
+ for (LogicalSchema inputSchema : inputSchemas) {
+ LogicalSchema.LogicalFieldSchema inputFieldSchema;
+ if (onSchema) {
+ inputFieldSchema = inputSchema.getFieldSubNameMatch(outputFieldSchema.alias);
+ } else {
+ inputFieldSchema = inputSchema.getField(i);
+ }
+
+ if (inputFieldSchema != null) {
+ if (inputFieldSchema.schema != null) {
+ fieldInputSchemas.add(inputFieldSchema.schema);
+ }
+
+ if (uid < 0) {
+ uid = getCachedOuputUid(inputFieldSchema.uid);
+ if (uid >= 0 && outputFieldSchema.schema == null) break;
+ }
+ }
+ }
+
+ // No cached uid. Allocate one, and locate and cache all inputs.
+ if (uid==-1) {
+ uid = LogicalExpression.getNextUid();
+ for (LogicalSchema inputSchema : inputSchemas) {
+ long inputUid;
+ LogicalFieldSchema matchedInputFieldSchema;
+ if (onSchema) {
+ matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
+ if (matchedInputFieldSchema!=null) {
+ inputUid = matchedInputFieldSchema.uid;
+ uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+ }
+ }
+ else {
+ matchedInputFieldSchema = mergedSchema.getField(i);
+ inputUid = inputSchema.getField(i).uid;
+ uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+ }
+ }
+ }
+
+ outputFieldSchema.uid = uid;
+
+ // This field has a schema. Assign uids to it as well
+ if (outputFieldSchema.schema != null) {
+ setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas);
+ }
+ }
+ }
+
private long getCachedOuputUid(long inputUid) {
long uid = -1;
diff --git a/test/org/apache/pig/test/TestUnionOnSchema.java b/test/org/apache/pig/test/TestUnionOnSchema.java
index 3dea37f..60081c3 100644
--- a/test/org/apache/pig/test/TestUnionOnSchema.java
+++ b/test/org/apache/pig/test/TestUnionOnSchema.java
@@ -478,6 +478,43 @@
* Test UNION ONSCHEMA on 3 inputs
*/
@Test
+ public void testUnionOnSchemaInnerSchema() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
+ String query =
+ " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ + " (i : long, c : chararray, j : int "
+ + ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ + " (i : long, c : chararray, j : int "
+ + ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ + "u = union onschema l1, l2; "
+ // The addition in the inner foreach will fail if the inner schema's uids
+ // are all set to -1, since the code that finds the inner load's schema will
+ // match the last item in b's schema, which is a chararray
+ + "p = foreach u { x = foreach b GENERATE c1 + 5 as c3; GENERATE i, c, x; }";
+
+ Util.registerMultiLineQuery(pig, query);
+ pig.explain("p", System.out);
+
+ Iterator<Tuple> it = pig.openIterator("p");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1L,'abc',{(6),(6)})",
+ "(5L,'def',{(7),(7)})",
+ "(1L,'abc',{(6),(6)})",
+ "(5L,'def',{(7),(7)})"
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+ }
+
+ /**
+ * Test UNION ONSCHEMA on 3 inputs
+ * @throws IOException
+ * @throws ParserException
+ */
+ @Test
public void testUnionOnSchema3Inputs() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =