blob: 13f5b387b5ab2931b907f4ac140c009bfc17a1b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.relational;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
public class LOGenerate extends LogicalRelationalOperator {
private List<LogicalExpressionPlan> outputPlans;
private boolean[] flattenFlags;
private int[] flattenNumFields;
// mUserDefinedSchema is the original input from the user, we don't suppose
// to store uid in mUserDefinedSchema
private List<LogicalSchema> mUserDefinedSchema = null;
private List<LogicalSchema> outputPlanSchemas = null;
private List<LogicalSchema> expSchemas = null;
// If LOGenerate generate new uid, cache it here.
// This happens when expression plan does not have complete schema, however,
// user give complete schema in ForEach statement in script
private List<LogicalSchema> uidOnlySchemas = null;
public LOGenerate(OperatorPlan plan, List<LogicalExpressionPlan> ps, boolean[] flatten) {
this( plan );
outputPlans = ps;
flattenFlags = flatten;
}
public void setOutputPlans(List<LogicalExpressionPlan> plans) {
this.outputPlans = plans;
}
public LOGenerate(OperatorPlan plan) {
super( "LOGenerate", plan );
}
@Override
public LogicalSchema getSchema() throws FrontendException {
if (schema != null) {
return schema;
}
if (uidOnlySchemas == null) {
uidOnlySchemas = new ArrayList<LogicalSchema>();
for (int i=0;i<outputPlans.size();i++) {
uidOnlySchemas.add(null);
}
}
schema = new LogicalSchema();
outputPlanSchemas = new ArrayList<LogicalSchema>();
expSchemas = new ArrayList<LogicalSchema>();
flattenNumFields = new int[outputPlans.size()];
for(int i=0; i<outputPlans.size(); i++) {
flattenNumFields[i] = 0;
LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
LogicalSchema mUserDefinedSchemaCopy = null;
if (mUserDefinedSchema!=null && mUserDefinedSchema.get(i)!=null) {
mUserDefinedSchemaCopy = new LogicalSchema();
for (LogicalSchema.LogicalFieldSchema fs : mUserDefinedSchema.get(i).getFields()) {
mUserDefinedSchemaCopy.addField(fs.deepCopy());
}
}
LogicalFieldSchema fieldSchema = null;
// schema of the expression after flatten
LogicalSchema expSchema = null;
if (exp.getFieldSchema()!=null) {
fieldSchema = exp.getFieldSchema().deepCopy();
expSchema = new LogicalSchema();
if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
// if type is primitive, just add to schema
if (fieldSchema != null)
expSchema.addField(fieldSchema);
} else {
// if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
if (fieldSchema.schema==null) {
expSchema = null;
}
else {
// if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
if (flattenFlags[i]) {
if (fieldSchema.type == DataType.BAG) {
// if it is bag, get the schema of tuples
if (fieldSchema.schema!=null) {
if (fieldSchema.schema.getField(0).schema!=null) {
innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
flattenNumFields[i] = innerFieldSchemas.size();
}
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
} else if (fieldSchema.type == DataType.MAP) {
//should only contain 1 schemafield for Map's value
innerFieldSchemas = fieldSchema.schema.getFields();
flattenNumFields[i] = 2; // used for FLATTEN(null-map)
LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
fsForValue.alias = fieldSchema.alias + "::value";
LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
expSchema.addField(fsForKey);
} else { // DataType.TUPLE
innerFieldSchemas = fieldSchema.schema.getFields();
flattenNumFields[i] = innerFieldSchemas.size();
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
expSchema.addField(fs);
}
else
expSchema.addField(fieldSchema);
}
}
}
// Merge with user defined schema
if (expSchema!=null && expSchema.size()==0)
expSchema = null;
LogicalSchema planSchema = new LogicalSchema();
expSchemas.add(expSchema);
if (mUserDefinedSchemaCopy!=null) {
LogicalSchema mergedSchema = new LogicalSchema();
// merge with userDefinedSchema
if (expSchema==null) {
// Use user defined schema
for (LogicalFieldSchema fs : mUserDefinedSchemaCopy.getFields()) {
fs.stampFieldSchema();
mergedSchema.addField(new LogicalFieldSchema(fs));
}
} else {
// Merge uid with the exp field schema
mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema, LogicalSchema.MergeMode.LoadForEach);
if (mergedSchema==null) {
throw new FrontendException(this, "Cannot merge (" + expSchema.toString(false) +
") with user defined schema (" + mUserDefinedSchemaCopy.toString(false) + ")", 1117);
}
mergedSchema.mergeUid(expSchema);
}
setNullTypeToByteArrayType(mergedSchema);
for (LogicalFieldSchema fs : mergedSchema.getFields()) {
planSchema.addField(fs);
}
} else {
// if any plan do not have schema, the whole LOGenerate do not have schema
if (expSchema==null) {
planSchema = null;
}
else {
// Merge schema for the plan
for (LogicalFieldSchema fs : expSchema.getFields())
planSchema.addField(fs);
}
}
if (planSchema==null) {
schema = null;
break;
}
for (LogicalFieldSchema fs : planSchema.getFields())
schema.addField(fs);
// If the schema is generated by user defined schema, keep uid
if (expSchema==null) {
LogicalSchema uidOnlySchema = planSchema.mergeUid(uidOnlySchemas.get(i));
uidOnlySchemas.set(i, uidOnlySchema);
}
outputPlanSchemas.add(planSchema);
}
if (schema==null || schema.size()==0) {
schema = null;
outputPlanSchemas = null;
}
return schema;
}
public List<LogicalExpressionPlan> getOutputPlans() {
return outputPlans;
}
public boolean[] getFlattenFlags() {
return flattenFlags;
}
public int [] getFlattenNumFields() {
return flattenNumFields;
}
public void setFlattenFlags(boolean[] flatten) {
flattenFlags = flatten;
}
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (!(other instanceof LOGenerate)) {
return false;
}
List<LogicalExpressionPlan> otherPlan = ((LOGenerate)other).getOutputPlans();
boolean[] fs = ((LOGenerate)other).getFlattenFlags();
if (outputPlans.size() != otherPlan.size()) {
return false;
}
for(int i=0; i<outputPlans.size(); i++) {
if (flattenFlags[i] != fs[i]) {
return false;
}
if (!outputPlans.get(i).isEqual(otherPlan.get(i))) {
return false;
}
}
return true;
}
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
throw new FrontendException("Expected LogicalPlanVisitor", 2223);
}
((LogicalRelationalNodesVisitor)v).visit(this);
}
@Override
public String toString() {
StringBuilder msg = new StringBuilder();
if (alias!=null) {
msg.append(alias + ": ");
}
msg.append("(Name: " + name + "[");
for (int i=0;i<flattenFlags.length;i++) {
msg.append(flattenFlags[i]);
if (i!=flattenFlags.length-1)
msg.append(",");
}
msg.append("] Schema: ");
if (schema!=null)
msg.append(schema);
else
msg.append("null");
msg.append(")");
if (annotations!=null) {
for (Map.Entry<String, Object> entry : annotations.entrySet()) {
msg.append(entry);
}
}
return msg.toString();
}
public List<LogicalSchema> getUserDefinedSchema() {
return mUserDefinedSchema;
}
public void setUserDefinedSchema(List<LogicalSchema> userDefinedSchema) {
mUserDefinedSchema = userDefinedSchema;
}
/**
* Get the output schema corresponding to each input expression plan
* @return list of output schemas
*/
public List<LogicalSchema> getOutputPlanSchemas() {
return outputPlanSchemas;
}
public void setOutputPlanSchemas(List<LogicalSchema> outputPlanSchemas) {
this.outputPlanSchemas = outputPlanSchemas;
}
public List<LogicalSchema> getUidOnlySchemas() {
return uidOnlySchemas;
}
public void setUidOnlySchemas(List<LogicalSchema> uidOnlySchemas) {
this.uidOnlySchemas = uidOnlySchemas;
}
@Override
public void resetUid() {
this.uidOnlySchemas = null;
}
@Override
public void resetSchema(){
super.resetSchema();
outputPlanSchemas = null;
}
public List<LogicalSchema> getExpSchemas() {
return expSchemas;
}
private void setNullTypeToByteArrayType (LogicalSchema s1) {
if( s1 != null ) {
for (LogicalFieldSchema fs : s1.getFields()) {
if( DataType.isSchemaType(fs.type) ) {
setNullTypeToByteArrayType(fs.schema);
} else if(fs.type == DataType.NULL) {
fs.type = DataType.BYTEARRAY;
}
}
}
}
}