| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.impl.logicalLayer.schema; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.pig.PigException; |
| import org.apache.pig.ResourceSchema; |
| import org.apache.pig.ResourceSchema.ResourceFieldSchema; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.impl.logicalLayer.CanonicalNamer; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.util.MultiMap; |
| |
| /** |
| * The Schema class encapsulates the notion of a schema for a relational operator. |
| * A schema is a list of columns that describe the output of a relational operator. |
| * Each column in the relation is represented as a FieldSchema, a static class inside |
| * the Schema. A column by definition has an alias, a type and a possible schema (if the |
| * column is a bag or a tuple). In addition, each column in the schema has a unique |
| * auto generated name used for tracking the lineage of the column in a sequence of |
| * statements. |
| * |
| * The lineage of the column is tracked using a map of the predecessors' columns to |
| * the operators that generate the predecessor columns. The predecessor columns are the |
| * columns required in order to generate the column under consideration. Similarly, a |
| * reverse lookup of operators that generate the predecessor column to the predecessor |
| * column is maintained. |
| */ |
| |
| public class Schema implements Serializable, Cloneable { |
| |
| private static final long serialVersionUID = 2L; |
| |
| public static class FieldSchema implements Serializable, Cloneable { |
| /** |
| * |
| */ |
| private static final long serialVersionUID = 2L; |
| |
| /** |
| * Alias for this field. |
| */ |
| public String alias; |
| |
| /** |
| * Datatype, using codes from {@link org.apache.pig.data.DataType}. |
| */ |
| public byte type; |
| |
| /** |
| * If this is a tuple itself, it can have a schema. Otherwise this field |
| * must be null. |
| */ |
| public Schema schema; |
| |
| /** |
| * Canonical name. This name uniquely identifies a field throughout |
| * the query. Unlike a an alias, it cannot be changed. It will |
| * change when the field is transformed in some way (such as being |
| * used in an arithmetic expression or passed to a udf). At that |
| * point a new canonical name will be generated for the field. |
| */ |
| public String canonicalName = null; |
| |
| /** |
| * Canonical namer object to generate new canonical names on |
| * request. In order to ensure unique and consistent names, across |
| * all field schema objects, the object is made static. |
| */ |
| public static final CanonicalNamer canonicalNamer = new CanonicalNamer(); |
| |
| private static Log log = LogFactory.getLog(Schema.FieldSchema.class); |
| |
| /** |
| * Constructor for any type. |
| * |
| * @param a |
| * Alias, if known. If unknown leave null. |
| * @param t |
| * Type, using codes from |
| * {@link org.apache.pig.data.DataType}. |
| */ |
| public FieldSchema(String a, byte t) { |
| alias = a; |
| type = t; |
| schema = null; |
| canonicalName = CanonicalNamer.getNewName(); |
| } |
| |
| /** |
| * Constructor for tuple fields. |
| * |
| * @param a |
| * Alias, if known. If unknown leave null. |
| * @param s |
| * Schema of this tuple. |
| */ |
| public FieldSchema(String a, Schema s) { |
| alias = a; |
| type = DataType.TUPLE; |
| schema = s; |
| canonicalName = CanonicalNamer.getNewName(); |
| } |
| |
| /** |
| * Constructor for tuple fields. |
| * |
| * @param a |
| * Alias, if known. If unknown leave null. |
| * @param s |
| * Schema of this tuple. |
| * @param t |
| * Type, using codes from |
| * {@link org.apache.pig.data.DataType}. |
| * |
| */ |
| public FieldSchema(String a, Schema s, byte t) throws FrontendException { |
| alias = a; |
| schema = s; |
| log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE); |
| |
| if ((null != s) && !(DataType.isSchemaType(t))) { |
| int errCode = 1020; |
| throw new FrontendException("Only a BAG, TUPLE or MAP can have schemas. Got " |
| + DataType.findTypeName(t), errCode, PigException.INPUT); |
| } |
| |
| type = t; |
| canonicalName = CanonicalNamer.getNewName(); |
| } |
| |
| /** |
| * Copy Constructor. |
| * |
| * @param fs |
| * Source FieldSchema |
| * |
| */ |
| public FieldSchema(FieldSchema fs) { |
| if(null != fs) { |
| alias = fs.alias; |
| if(null != fs.schema) { |
| schema = new Schema(fs.schema); |
| } else { |
| schema = null; |
| } |
| type = fs.type; |
| } else { |
| alias = null; |
| schema = null; |
| type = DataType.UNKNOWN; |
| } |
| canonicalName = CanonicalNamer.getNewName(); |
| } |
| |
| /** |
| * Two field schemas are equal if types and schemas |
| * are equal in all levels. |
| * |
| * In order to relax alias equivalent requirement, |
| * instead use equals(FieldSchema fschema, |
| FieldSchema fother, |
| boolean relaxInner, |
| boolean relaxAlias) |
| */ |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof FieldSchema)) return false; |
| FieldSchema otherfs = (FieldSchema)other; |
| |
| return FieldSchema.equals(this, otherfs, false, false) ; |
| } |
| |
| |
| @Override |
| public int hashCode() { |
| return (this.type * 17) |
| + ( (schema==null? 0:schema.hashCode()) * 23 ) |
| + ( (alias==null? 0:alias.hashCode()) * 29 ) ; |
| } |
| |
| /** |
| * Recursively compare two schemas to check if the input schema |
| * can be cast to the cast schema |
| * @param castFs schema of the cast operator |
| * @param inputFs schema of the cast input |
| * @return true or falsew! |
| */ |
| public static boolean castable( |
| Schema.FieldSchema castFs, |
| Schema.FieldSchema inputFs) { |
| if(castFs == null && inputFs == null) { |
| return false; |
| } |
| |
| if (castFs == null) { |
| return false ; |
| } |
| |
| if (inputFs == null) { |
| return false ; |
| } |
| byte inputType = inputFs.type; |
| byte castType = castFs.type; |
| |
| if (DataType.isSchemaType(castFs.type)) { |
| if(inputType == DataType.BYTEARRAY) { |
| // good |
| } else if (inputType == castType) { |
| // Don't do the comparison if both embedded schemas are |
| // null. That will cause Schema.equals to return false, |
| // even though we want to view that as true. |
| if (!(castFs.schema == null && inputFs.schema == null)) { |
| // compare recursively using schema |
| if (!Schema.castable(castFs.schema, inputFs.schema)) { |
| return false ; |
| } |
| } |
| } else { |
| return false; |
| } |
| } else { |
| if (inputType == castType) { |
| // good |
| } |
| else if (inputType == DataType.BOOLEAN && (castType == DataType.CHARARRAY |
| || castType == DataType.BYTEARRAY || DataType.isNumberType(castType))) { |
| // good |
| } |
| else if (DataType.isNumberType(inputType) && (castType == DataType.CHARARRAY |
| || castType == DataType.BYTEARRAY || DataType.isNumberType(castType) |
| || castType == DataType.BOOLEAN || castType == DataType.DATETIME)) { |
| // good |
| } |
| else if (inputType == DataType.DATETIME && (castType == DataType.CHARARRAY |
| || castType == DataType.BYTEARRAY || DataType.isNumberType(castType))) { |
| // good |
| } |
| else if (inputType == DataType.CHARARRAY && (castType == DataType.BYTEARRAY |
| || DataType.isNumberType(castType) || castType == DataType.BOOLEAN |
| || castType == DataType.DATETIME)) { |
| // good |
| } |
| else if (inputType == DataType.BYTEARRAY) { |
| // good |
| } |
| else { |
| return false; |
| } |
| } |
| |
| return true ; |
| } |
| |
| /*** |
| * Compare two field schema for equality |
| * @param fschema |
| * @param fother |
| * @param relaxInner If true, we don't check inner tuple schemas |
| * @param relaxAlias If true, we don't check aliases |
| * @return true if FieldSchemas are equal, false otherwise |
| */ |
| public static boolean equals(FieldSchema fschema, |
| FieldSchema fother, |
| boolean relaxInner, |
| boolean relaxAlias) { |
| if (fschema == null) { |
| return false ; |
| } |
| |
| if (fother == null) { |
| return false ; |
| } |
| |
| if (fschema.type != fother.type) { |
| return false ; |
| } |
| |
| |
| if (!relaxAlias) { |
| if ( (fschema.alias == null) && |
| (fother.alias == null) ) { |
| // good |
| } |
| else if ( (fschema.alias != null) && |
| (fother.alias == null) ) { |
| return false ; |
| } |
| else if ( (fschema.alias == null) && |
| (fother.alias != null) ) { |
| return false ; |
| } |
| else if (!fschema.alias.equals(fother.alias)) { |
| return false ; |
| } |
| } |
| |
| if ( (!relaxInner) && (DataType.isSchemaType(fschema.type))) { |
| // Don't do the comparison if both embedded schemas are |
| // null. That will cause Schema.equals to return false, |
| // even though we want to view that as true. |
| if (!(fschema.schema == null && fother.schema == null)) { |
| // compare recursively using schema |
| if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) { |
| return false ; |
| } |
| } |
| } |
| |
| return true ; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| if (alias != null) { |
| sb.append(alias); |
| sb.append(": "); |
| } |
| sb.append(DataType.findTypeName(type)); |
| |
| if (schema != null) { |
| sb.append("("); |
| sb.append(schema.toString()); |
| sb.append(")"); |
| } |
| |
| // if (canonicalName != null) { |
| // sb.append(" cn: "); |
| // sb.append(canonicalName); |
| // } |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Make a deep copy of this FieldSchema and return it. |
| * @return clone of the this FieldSchema. |
| * @throws CloneNotSupportedException |
| */ |
| @Override |
| public FieldSchema clone() throws CloneNotSupportedException { |
| // Strings are immutable, so we don't need to copy alias. Schemas |
| // are mutable so we need to make a copy. |
| try { |
| FieldSchema fs = new FieldSchema(alias, |
| (schema == null ? null : schema.clone()), type); |
| fs.canonicalName = CanonicalNamer.getNewName(); |
| return fs; |
| } catch (FrontendException fe) { |
| throw new RuntimeException( |
| "Should never fail to clone a FieldSchema", fe); |
| } |
| } |
| |
| /*** |
| * Recursively prefix merge two schemas |
| * @param otherFs the other field schema to be merged with |
| * @return the prefix merged field schema this can be null if one schema is null and |
| * allowIncompatibleTypes is true |
| * |
| * @throws SchemaMergeException if they cannot be merged |
| */ |
| |
| public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs) throws SchemaMergeException { |
| return mergePrefixFieldSchema(otherFs, true, false); |
| } |
| |
| /*** |
| * Recursively prefix merge two schemas |
| * @param otherFs the other field schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * field schema take precedence |
| * @return the prefix merged field schema this can be null if one schema is null and |
| * allowIncompatibleTypes is true |
| * |
| * @throws SchemaMergeException if they cannot be merged |
| */ |
| |
| public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs, |
| boolean otherTakesAliasPrecedence) |
| throws SchemaMergeException { |
| return mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence, false); |
| } |
| |
| /*** |
| * Recursively prefix merge two schemas |
| * @param otherFs the other field schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * field schema take precedence |
| * @param allowMergeableTypes true if "mergeable" types should be allowed. |
| * Two types are mergeable if any of the following conditions is true IN THE |
| * BELOW ORDER of checks: |
| * 1) if either one has a type null or unknown and other has a type OTHER THAN |
| * null or unknown, the result type will be the latter non null/unknown type |
| * 2) If either type is bytearray, then result type will be the other (possibly non BYTEARRAY) type |
| * 3) If current type can be cast to the other type, then the result type will be the |
| * other type |
| * @return the prefix merged field schema this can be null. |
| * |
| * @throws SchemaMergeException if they cannot be merged |
| */ |
| |
| public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs, |
| boolean otherTakesAliasPrecedence, boolean allowMergeableTypes) |
| throws SchemaMergeException { |
| Schema.FieldSchema myFs = this; |
| Schema.FieldSchema mergedFs = null; |
| byte mergedType = DataType.NULL; |
| |
| if(null == otherFs) { |
| return myFs; |
| } |
| |
| if(isNullOrUnknownType(myFs) && isNullOrUnknownType(otherFs)) { |
| int errCode = 1021; |
| String msg = "Type mismatch. No useful type for merging. Field Schema: " + myFs + ". Other Field Schema: " + otherFs; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT); |
| } else if(myFs.type == otherFs.type) { |
| mergedType = myFs.type; |
| } else if (!isNullOrUnknownType(myFs) && isNullOrUnknownType(otherFs)) { |
| mergedType = myFs.type; |
| } else { |
| if (allowMergeableTypes) { |
| if (isNullOrUnknownType(myFs) && !isNullOrUnknownType(otherFs)) { |
| mergedType = otherFs.type; |
| } else if(otherFs.type == DataType.BYTEARRAY) { |
| // just set mergeType to myFs's type (could even be BYTEARRAY) |
| mergedType = myFs.type; |
| } else { |
| if(castable(otherFs, myFs)) { |
| mergedType = otherFs.type; |
| } else { |
| int errCode = 1022; |
| String msg = "Type mismatch for merging schema prefix. Field Schema: " + myFs + ". Other Field Schema: " + otherFs; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT); |
| } |
| } |
| } else { |
| int errCode = 1022; |
| String msg = "Type mismatch merging schema prefix. Field Schema: " + myFs + ". Other Field Schema: " + otherFs; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT); |
| } |
| } |
| |
| String mergedAlias = mergeAlias(myFs.alias, |
| otherFs.alias, |
| otherTakesAliasPrecedence) ; |
| |
| if (!DataType.isSchemaType(mergedType)) { |
| // just normal merge |
| mergedFs = new FieldSchema(mergedAlias, mergedType) ; |
| } |
| else { |
| Schema mergedSubSchema = null; |
| // merge inner schemas because both sides have schemas |
| if(null != myFs.schema) { |
| mergedSubSchema = myFs.schema.mergePrefixSchema(otherFs.schema, |
| otherTakesAliasPrecedence, allowMergeableTypes); |
| } else { |
| mergedSubSchema = otherFs.schema; |
| setSchemaDefaultType(mergedSubSchema, DataType.BYTEARRAY); |
| } |
| // create the merged field |
| try { |
| mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ; |
| } catch (FrontendException fee) { |
| int errCode = 1023; |
| String msg = "Unable to create field schema."; |
| throw new SchemaMergeException(msg, errCode, PigException.BUG, fee); |
| } |
| } |
| return mergedFs; |
| } |
| |
| /** |
| * Recursively set NULL type to the specifid type |
| * @param fs the field schema whose NULL type has to be set |
| * @param t the specified type |
| */ |
| public static void setFieldSchemaDefaultType(Schema.FieldSchema fs, byte t) { |
| if(null == fs) return; |
| if(DataType.NULL == fs.type) { |
| fs.type = t; |
| } |
| if(DataType.isSchemaType(fs.type)) { |
| setSchemaDefaultType(fs.schema, t); |
| } |
| } |
| |
| |
| private boolean isNullOrUnknownType(FieldSchema fs) { |
| return (fs.type == DataType.NULL || fs.type == DataType.UNKNOWN); |
| } |
| |
| /** |
| * Find a field schema instance in this FieldSchema hierarchy (including "this") |
| * that matches the given canonical name. |
| * |
| * @param canonicalName canonical name |
| * @return the FieldSchema instance found |
| */ |
| public FieldSchema findFieldSchema(String canonicalName) { |
| if( this.canonicalName.equals(canonicalName) ) { |
| return this; |
| } |
| if( this.schema != null ) |
| return schema.findFieldSchema( canonicalName ); |
| return null; |
| } |
| |
| } |
| |
| private List<FieldSchema> mFields; |
| private Map<String, FieldSchema> mAliases; |
| private MultiMap<String, String> mFieldSchemas; |
| private static Log log = LogFactory.getLog(Schema.class); |
| // In bags which have a schema with a tuple which contains |
| // the fields present in it, if we access the second field (say) |
| // we are actually trying to access the second field in the |
| // tuple in the bag. This is currently true for two cases: |
| // 1) bag constants - the schema of bag constant has a tuple |
| // which internally has the actual elements |
| // 2) When bags are loaded from input data, if the user |
| // specifies a schema with the "bag" type, he has to specify |
| // the bag as containing a tuple with the actual elements in |
| // the schema declaration. However in both the cases above, |
| // the user can still say b.i where b is the bag and i is |
| // an element in the bag's tuple schema. So in these cases, |
| // the access should translate to a lookup for "i" in the |
| // tuple schema present in the bag. To indicate this, the |
| // flag below is used. It is false by default because, |
| // currently we use bag as the type for relations. However |
| // the schema of a relation does NOT have a tuple fieldschema |
| // with items in it. Instead, the schema directly has the |
| // field schema of the items. So for a relation "b", the |
| // above b.i access would be a direct single level access |
| // of i in b's schema. This is treated as the "default" case |
| private boolean twoLevelAccessRequired = false; |
| |
| public Schema() { |
| mFields = new ArrayList<FieldSchema>(); |
| mAliases = new HashMap<String, FieldSchema>(); |
| mFieldSchemas = new MultiMap<String, String>(); |
| } |
| |
| /** |
| * @param fields List of field schemas that describes the fields. |
| */ |
| public Schema(List<FieldSchema> fields) { |
| mFields = fields; |
| mAliases = new HashMap<String, FieldSchema>(fields.size()); |
| mFieldSchemas = new MultiMap<String, String>(); |
| for (FieldSchema fs : fields) { |
| if(null != fs) { |
| if (fs.alias != null) { |
| mAliases.put(fs.alias, fs); |
| mFieldSchemas.put(fs.canonicalName, fs.alias); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create a schema with only one field. |
| * @param fieldSchema field to put in this schema. |
| */ |
| public Schema(FieldSchema fieldSchema) { |
| mFields = new ArrayList<FieldSchema>(1); |
| mFields.add(fieldSchema); |
| mAliases = new HashMap<String, FieldSchema>(1); |
| mFieldSchemas = new MultiMap<String, String>(); |
| if(null != fieldSchema) { |
| if (fieldSchema.alias != null) { |
| mAliases.put(fieldSchema.alias, fieldSchema); |
| mFieldSchemas.put(fieldSchema.canonicalName, fieldSchema.alias); |
| } |
| } |
| } |
| |
| /** |
| * Copy Constructor. |
| * @param s source schema |
| */ |
| public Schema(Schema s) { |
| |
| if(null != s) { |
| twoLevelAccessRequired = s.twoLevelAccessRequired; |
| mFields = new ArrayList<FieldSchema>(s.size()); |
| mAliases = new HashMap<String, FieldSchema>(); |
| mFieldSchemas = new MultiMap<String, String>(); |
| try { |
| for (int i = 0; i < s.size(); ++i) { |
| FieldSchema fs = new FieldSchema(s.getField(i)); |
| mFields.add(fs); |
| if(null != fs) { |
| if (fs.alias != null) { |
| mAliases.put(fs.alias, fs); |
| mFieldSchemas.put(fs.canonicalName, fs.alias); |
| } |
| } |
| } |
| } catch (FrontendException pe) { |
| mFields = new ArrayList<FieldSchema>(); |
| mAliases = new HashMap<String, FieldSchema>(); |
| mFieldSchemas = new MultiMap<String, String>(); |
| } |
| } else { |
| mFields = new ArrayList<FieldSchema>(); |
| mAliases = new HashMap<String, FieldSchema>(); |
| mFieldSchemas = new MultiMap<String, String>(); |
| } |
| } |
| |
| /** |
| * Given an alias name, find the associated FieldSchema. |
| * @param alias Alias to look up. |
| * @return FieldSchema, or null if no such alias is in this tuple. |
| */ |
| public FieldSchema getField(String alias) throws FrontendException { |
| FieldSchema fs = mAliases.get(alias); |
| if(null == fs) { |
| String cocoPrefix = "::" + alias; |
| Map<String, Integer> aliasMatches = new HashMap<String, Integer>(); |
| //build the map of aliases that have cocoPrefix as the suffix |
| for(String key: mAliases.keySet()) { |
| if(key.endsWith(cocoPrefix)) { |
| Integer count = aliasMatches.get(key); |
| if(null == count) { |
| aliasMatches.put(key, 1); |
| } else { |
| aliasMatches.put(key, ++count); |
| } |
| } |
| } |
| //process the map to check if |
| //1. are there multiple keys with count == 1 |
| //2. are there keys with count > 1 --> should never occur |
| //3. if thers is a single key with count == 1 we have our match |
| |
| if(aliasMatches.keySet().size() == 0) { |
| return null; |
| } |
| if(aliasMatches.keySet().size() == 1) { |
| Object[] keys = aliasMatches.keySet().toArray(); |
| String key = (String)keys[0]; |
| if(aliasMatches.get(key) > 1) { |
| int errCode = 1024; |
| throw new FrontendException("Found duplicate aliases: " + key, errCode, PigException.INPUT); |
| } |
| return mAliases.get(key); |
| } else { |
| // check if the multiple aliases obtained actually |
| // point to the same field schema - then just return |
| // that field schema |
| Set<FieldSchema> set = new HashSet<FieldSchema>(); |
| for (String key: aliasMatches.keySet()) { |
| set.add(mAliases.get(key)); |
| } |
| if(set.size() == 1) { |
| return set.iterator().next(); |
| } |
| |
| boolean hasNext = false; |
| StringBuilder sb = new StringBuilder("Found more than one match: "); |
| for (String key: aliasMatches.keySet()) { |
| if(hasNext) { |
| sb.append(", "); |
| } else { |
| hasNext = true; |
| } |
| sb.append(key); |
| } |
| int errCode = 1025; |
| throw new FrontendException(sb.toString(), errCode, PigException.INPUT); |
| } |
| } else { |
| return fs; |
| } |
| } |
| |
| |
| /** |
| * Given an alias name, find the associated FieldSchema. If exact name is |
| * not found see if any field matches the part of the 'namespaced' alias. |
| * eg. if given alias is nm::a , and schema is (a,b). It will return |
| * FieldSchema of a. |
| * if given alias is nm::a and schema is (nm2::a, b), it will return null |
| * @param alias Alias to look up. |
| * @return FieldSchema, or null if no such alias is in this tuple. |
| */ |
| public FieldSchema getFieldSubNameMatch(String alias) throws FrontendException { |
| if(alias == null) |
| return null; |
| FieldSchema fs = getField(alias); |
| if(fs != null){ |
| return fs; |
| } |
| //fs is null |
| final String sep = "::"; |
| ArrayList<FieldSchema> matchedFieldSchemas = new ArrayList<FieldSchema>(); |
| if(alias.contains(sep)){ |
| for(FieldSchema field : mFields) { |
| if(alias.endsWith(sep + field.alias)){ |
| matchedFieldSchemas.add(field); |
| } |
| } |
| } |
| if(matchedFieldSchemas.size() > 1){ |
| boolean hasNext = false; |
| StringBuilder sb = new StringBuilder("Found more than one " + |
| "sub alias name match: "); |
| for (FieldSchema matchFs : matchedFieldSchemas) { |
| if(hasNext) { |
| sb.append(", "); |
| } else { |
| hasNext = true; |
| } |
| sb.append(matchFs.alias); |
| } |
| int errCode = 1116; |
| throw new FrontendException(sb.toString(), errCode, PigException.INPUT); |
| }else if(matchedFieldSchemas.size() == 1){ |
| fs = matchedFieldSchemas.get(0); |
| } |
| |
| return fs; |
| } |
| |
| |
| |
| /** |
| * Given a field number, find the associated FieldSchema. |
| * |
| * @param fieldNum |
| * Field number to look up. |
| * @return FieldSchema for this field. |
| * @throws ParseException |
| * if the field number exceeds the number of fields in the |
| * tuple. |
| */ |
| public FieldSchema getField(int fieldNum) throws FrontendException { |
| if (fieldNum >= mFields.size()) { |
| int errCode = 1026; |
| String detailedMsg = "Attempt to access field: " + fieldNum + " from schema: " + this; |
| String msg = "Attempt to fetch field " + fieldNum + " from schema of size " + mFields.size(); |
| throw new FrontendException(msg, errCode, PigException.INPUT, false, detailedMsg); |
| } |
| |
| return mFields.get(fieldNum); |
| } |
| |
| /** |
| * Find the number of fields in the schema. |
| * |
| * @return number of fields. |
| */ |
| public int size() { |
| return mFields.size(); |
| } |
| |
| /** |
| * Reconcile this schema with another schema. The schema being reconciled |
| * with should have the same number of columns. The use case is where a |
| * schema already exists but may not have alias and or type information. If |
| * an alias exists in this schema and a new one is given, then the new one |
| * will be used. Similarly with types, though this needs to be used |
| * carefully, as types should not be lightly changed. |
| * @param other Schema to reconcile with. |
| * @throws ParseException if this cannot be reconciled. |
| */ |
| public void reconcile(Schema other) throws FrontendException { |
| |
| if (other != null) { |
| |
| if (other.size() != size()) { |
| int errCode = 1027; |
| String msg = "Cannot reconcile schemas with different " |
| + "sizes. This schema has size " + size() + " other has size " |
| + "of " + other.size(); |
| String detailedMsg = "Schema size mismatch. This schema: " + this + " other schema: " + other; |
| throw new FrontendException(msg, errCode, PigException.INPUT, false, detailedMsg); |
| } |
| |
| Iterator<FieldSchema> i = other.mFields.iterator(); |
| for (int j = 0; i.hasNext(); j++) { |
| FieldSchema otherFs = i.next(); |
| FieldSchema ourFs = mFields.get(j); |
| log.debug("ourFs: " + ourFs + " otherFs: " + otherFs); |
| if (otherFs.alias != null) { |
| log.debug("otherFs.alias: " + otherFs.alias); |
| if (ourFs.alias != null) { |
| log.debug("Removing ourFs.alias: " + ourFs.alias); |
| mAliases.remove(ourFs.alias); |
| Collection<String> aliases = mFieldSchemas.get(ourFs.canonicalName); |
| if (aliases != null) { |
| List<String> listAliases = new ArrayList<String>(); |
| for(String alias: aliases) { |
| listAliases.add(alias); |
| } |
| for(String alias: listAliases) { |
| log.debug("Removing alias " + alias + " from multimap"); |
| mFieldSchemas.remove(ourFs.canonicalName, alias); |
| } |
| } |
| } |
| ourFs.alias = otherFs.alias; |
| log.debug("Setting alias to: " + otherFs.alias); |
| mAliases.put(ourFs.alias, ourFs); |
| if(null != ourFs.alias) { |
| mFieldSchemas.put(ourFs.canonicalName, ourFs.alias); |
| } |
| } |
| if (otherFs.type != DataType.UNKNOWN) { |
| ourFs.type = otherFs.type; |
| log.debug("Setting type to: " |
| + DataType.findTypeName(otherFs.type)); |
| } |
| if (otherFs.schema != null) { |
| ourFs.schema = otherFs.schema; |
| log.debug("Setting schema to: " + otherFs.schema); |
| } |
| |
| } |
| } |
| } |
| |
| /*** |
| * For two schemas to be equal, they have to be deeply equal. |
| * Use Schema.equals(Schema schema, |
| Schema other, |
| boolean relaxInner, |
| boolean relaxAlias) |
| if relaxation of aliases is a requirement. |
| */ |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof Schema)) return false; |
| |
| Schema s = (Schema)other; |
| return Schema.equals(this, s, false, false) ; |
| |
| } |
| |
| /** |
| * Make a deep copy of a schema. |
| * @throws CloneNotSupportedException |
| */ |
| @Override |
| public Schema clone() throws CloneNotSupportedException { |
| Schema s = new Schema(); |
| |
| // Build a map between old and new field schemas, so we can properly |
| // construct the new alias and field schema maps. Populate the field |
| // list with copies of the existing field schemas. |
| Map<FieldSchema, FieldSchema> fsMap = |
| new HashMap<FieldSchema, FieldSchema>(size()); |
| Map<String, FieldSchema> fsCanonicalNameMap = |
| new HashMap<String, FieldSchema>(size()); |
| for (FieldSchema fs : mFields) { |
| FieldSchema copy = fs.clone(); |
| s.mFields.add(copy); |
| fsMap.put(fs, copy); |
| fsCanonicalNameMap.put(fs.canonicalName, copy); |
| } |
| |
| // Build the aliases map |
| for (String alias : mAliases.keySet()) { |
| FieldSchema oldFs = mAliases.get(alias); |
| assert(oldFs != null); |
| FieldSchema newFs = fsMap.get(oldFs); |
| assert(newFs != null); |
| s.mAliases.put(alias, newFs); |
| } |
| |
| // Build the field schemas map |
| for (String oldFsCanonicalName : mFieldSchemas.keySet()) { |
| FieldSchema newFs = fsCanonicalNameMap.get(oldFsCanonicalName); |
| assert(newFs != null); |
| s.mFieldSchemas.put(newFs.canonicalName, mFieldSchemas.get(oldFsCanonicalName)); |
| } |
| |
| s.twoLevelAccessRequired = twoLevelAccessRequired; |
| return s; |
| } |
| |
| |
| |
| static int[] primeList = { 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, |
| 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, |
| 83, 89, 97, 101, 103, 107, 109, 1133} ; |
| |
| @Override |
| public int hashCode() { |
| int idx = 0 ; |
| int hashCode = 0 ; |
| for(FieldSchema fs: this.mFields) { |
| hashCode += fs.hashCode() * (primeList[idx % primeList.length]) ; |
| idx++ ; |
| } |
| return hashCode ; |
| } |
| |
| @Override |
| public String toString() { |
| return toIndentedString(Integer.MIN_VALUE); |
| } |
| |
| public String prettyPrint() { |
| return toIndentedString(0); |
| } |
| |
| private String toIndentedString(int indentLevel) { |
| StringBuilder sb = new StringBuilder(); |
| try { |
| stringifySchema(sb, this, DataType.BAG, indentLevel) ; |
| } |
| catch (FrontendException fee) { |
| throw new RuntimeException("PROBLEM PRINTING SCHEMA") ; |
| } |
| return sb.toString(); |
| } |
| |
| public static void stringifySchema(StringBuilder sb, Schema schema, byte type) |
| throws FrontendException { |
| stringifySchema(sb, schema, type, 0); |
| } |
| |
| // This is used for building up output string |
| // type can only be BAG or TUPLE |
| public static void stringifySchema(StringBuilder sb, |
| Schema schema, |
| byte type, |
| int indentLevel) |
| throws FrontendException{ |
| |
| if (type == DataType.TUPLE) { |
| sb.append("(") ; |
| } |
| else if (type == DataType.BAG) { |
| sb.append("{") ; |
| } |
| |
| indentLevel++; |
| |
| if (schema != null) { |
| boolean isFirst = true ; |
| for (int i=0; i< schema.size() ;i++) { |
| |
| if (!isFirst) { |
| sb.append(",") ; |
| } |
| else { |
| isFirst = false ; |
| } |
| |
| indent(sb, indentLevel); |
| |
| FieldSchema fs = schema.getField(i) ; |
| |
| if(fs == null) { |
| continue; |
| } |
| |
| if (fs.alias != null) { |
| sb.append(fs.alias); |
| sb.append(": "); |
| } |
| |
| if (DataType.isAtomic(fs.type)) { |
| sb.append(DataType.findTypeName(fs.type)) ; |
| } |
| else if ( (fs.type == DataType.TUPLE) || |
| (fs.type == DataType.BAG) ) { |
| // safety net |
| if (schema != fs.schema) { |
| stringifySchema(sb, fs.schema, fs.type, indentLevel) ; |
| } |
| else { |
| throw new AssertionError("Schema refers to itself " |
| + "as inner schema") ; |
| } |
| } else if (fs.type == DataType.MAP) { |
| sb.append(DataType.findTypeName(fs.type) + "["); |
| if (fs.schema!=null) |
| stringifySchema(sb, fs.schema, fs.type, indentLevel); |
| sb.append("]"); |
| } else { |
| sb.append(DataType.findTypeName(fs.type)) ; |
| } |
| } |
| } |
| |
| indentLevel--; |
| indent(sb, indentLevel); |
| |
| if (type == DataType.TUPLE) { |
| sb.append(")") ; |
| } |
| else if (type == DataType.BAG) { |
| sb.append("}") ; |
| } |
| |
| } |
| |
| /** |
| * no-op if indentLevel is negative.<br> |
| * otherwise, print newline and 4*indentLevel spaces. |
| */ |
| private static void indent(StringBuilder sb, int indentLevel) { |
| if (indentLevel >= 0) { |
| sb.append("\n"); |
| } |
| while (indentLevel-- > 0) { |
| sb.append(" "); // 4 spaces. |
| } |
| } |
| |
| public void add(FieldSchema f) { |
| mFields.add(f); |
| if(null != f) { |
| mFieldSchemas.put(f.canonicalName, f.alias); |
| if (null != f.alias) { |
| mAliases.put(f.alias, f); |
| } |
| } |
| } |
| |
| /** |
| * Given an alias, find the associated position of the field schema. |
| * |
| * @param alias |
| * alias of the FieldSchema. |
| * @return position of the FieldSchema. |
| */ |
| public int getPosition(String alias) throws FrontendException{ |
| return getPosition(alias, false); |
| } |
| |
| |
| /** |
| * Given an alias, find the associated position of the field schema. |
| * It uses getFieldSubNameMatch to look for subName matches as well. |
| * @param alias |
| * alias of the FieldSchema. |
| * @return position of the FieldSchema. |
| */ |
| public int getPositionSubName(String alias) throws FrontendException{ |
| return getPosition(alias, true); |
| } |
| |
| |
| private int getPosition(String alias, boolean isSubNameMatch) |
| throws FrontendException { |
| if(isSubNameMatch && twoLevelAccessRequired){ |
| // should not happen |
| int errCode = 2248; |
| String msg = "twoLevelAccessRequired==true is not supported with" + |
| "and isSubNameMatch==true "; |
| throw new FrontendException(msg, errCode, PigException.BUG); |
| } |
| if(twoLevelAccessRequired) { |
| // this is the case where "this" schema is that of |
| // a bag which has just one tuple fieldschema which |
| // in turn has a list of fieldschemas. The alias supplied |
| // should be treated as an alias in the tuple's schema |
| |
| // check that indeed we only have one field schema |
| // which is that of a tuple |
| if(mFields.size() != 1) { |
| int errCode = 1008; |
| String msg = "Expected a bag schema with a single " + |
| "element of type "+ DataType.findTypeName(DataType.TUPLE) + |
| " but got a bag schema with multiple elements."; |
| throw new FrontendException(msg, errCode, PigException.INPUT); |
| } |
| Schema.FieldSchema tupleFS = mFields.get(0); |
| if(tupleFS.type != DataType.TUPLE) { |
| int errCode = 1009; |
| String msg = "Expected a bag schema with a single " + |
| "element of type "+ DataType.findTypeName(DataType.TUPLE) + |
| " but got an element of type " + |
| DataType.findTypeName(tupleFS.type); |
| throw new FrontendException(msg, errCode, PigException.INPUT); |
| } |
| |
| // check if the alias supplied is that of the tuple |
| // itself - then disallow it since we do not allow access |
| // to the tuple itself - we only allow access to the fields |
| // in the tuple |
| if(alias.equals(tupleFS.alias)) { |
| int errCode = 1028; |
| String msg = "Access to the tuple ("+ alias + ") of " + |
| "the bag is disallowed. Only access to the elements of " + |
| "the tuple in the bag is allowed."; |
| throw new FrontendException(msg, errCode, PigException.INPUT); |
| } |
| |
| // all is good - get the position from the tuple's schema |
| return tupleFS.schema.getPosition(alias); |
| } else { |
| FieldSchema fs = isSubNameMatch ? getFieldSubNameMatch(alias) : getField(alias); |
| |
| if (null == fs) { |
| return -1; |
| } |
| |
| log.debug("fs: " + fs); |
| int index = -1; |
| for(int i = 0; i < mFields.size(); ++i) { |
| log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " + mFields.get(i).alias); |
| if(fs == mFields.get(i)) {index = i;} |
| } |
| |
| log.debug("index: " + index); |
| return index; |
| //return mFields.indexOf(fs); |
| } |
| } |
| |
| public void addAlias(String alias, FieldSchema fs) { |
| if(null != alias) { |
| mAliases.put(alias, fs); |
| if(null != fs) { |
| mFieldSchemas.put(fs.canonicalName, alias); |
| } |
| } |
| } |
| |
| public Set<String> getAliases() { |
| return mAliases.keySet(); |
| } |
| |
| public void printAliases() { |
| Set<String> aliasNames = mAliases.keySet(); |
| for (String alias : aliasNames) { |
| log.debug("Schema Alias: " + alias); |
| } |
| } |
| |
| public List<FieldSchema> getFields() { |
| return mFields; |
| } |
| |
| /** |
| * Recursively compare two schemas to check if the input schema |
| * can be cast to the cast schema |
| * @param cast schema of the cast operator |
| * @param input schema of the cast input |
| * @return true or falsew! |
| */ |
| public static boolean castable(Schema cast, Schema input) { |
| |
| // If both of them are null, they are castable |
| if ((cast == null) && (input == null)) { |
| return false ; |
| } |
| |
| // otherwise |
| if (cast == null) { |
| return false ; |
| } |
| |
| if (input == null) { |
| return false ; |
| } |
| |
| if (cast.size() > input.size()) return false; |
| |
| Iterator<FieldSchema> i = cast.mFields.iterator(); |
| Iterator<FieldSchema> j = input.mFields.iterator(); |
| |
| while (i.hasNext()) { |
| //iterate only for the number of fields in cast |
| |
| FieldSchema castFs = i.next() ; |
| FieldSchema inputFs = j.next() ; |
| |
| // Compare recursively using field schema |
| if (!FieldSchema.castable(castFs, inputFs)) { |
| return false ; |
| } |
| |
| } |
| return true; |
| } |
| |
| /** |
| * Recursively compare two schemas for equality |
| * @param schema |
| * @param other |
| * @param relaxInner if true, inner schemas will not be checked |
| * @param relaxAlias if true, aliases will not be checked |
| * @return true if schemas are equal, false otherwise |
| */ |
| public static boolean equals(Schema schema, |
| Schema other, |
| boolean relaxInner, |
| boolean relaxAlias) { |
| |
| // If both of them are null, they are equal |
| if ((schema == null) && (other == null)) { |
| return true ; |
| } |
| |
| // otherwise |
| if (schema == null) { |
| return false ; |
| } |
| |
| if (other == null) { |
| return false ; |
| } |
| |
| /* |
| * Need to check for bags with schemas and bags with tuples that in turn have schemas. |
| * Retrieve the tuple schema of the bag if twoLevelAccessRequired |
| * Assuming that only bags exhibit this behavior and twoLevelAccessRequired is used |
| * with the right intentions |
| */ |
| if(schema.isTwoLevelAccessRequired() || other.isTwoLevelAccessRequired()) { |
| if(schema.isTwoLevelAccessRequired()) { |
| try { |
| schema = schema.getField(0).schema; |
| } catch (FrontendException fee) { |
| return false; |
| } |
| } |
| |
| if(other.isTwoLevelAccessRequired()) { |
| try { |
| other = other.getField(0).schema; |
| } catch (FrontendException fee) { |
| return false; |
| } |
| } |
| |
| return Schema.equals(schema, other, relaxInner, relaxAlias); |
| } |
| |
| if (schema.size() != other.size()) return false; |
| |
| Iterator<FieldSchema> i = schema.mFields.iterator(); |
| Iterator<FieldSchema> j = other.mFields.iterator(); |
| |
| while (i.hasNext()) { |
| |
| FieldSchema myFs = i.next() ; |
| FieldSchema otherFs = j.next() ; |
| |
| if (!relaxAlias) { |
| if ( (myFs.alias == null) && |
| (otherFs.alias == null) ) { |
| // good |
| } |
| else if ( (myFs.alias != null) && |
| (otherFs.alias == null) ) { |
| return false ; |
| } |
| else if ( (myFs.alias == null) && |
| (otherFs.alias != null) ) { |
| return false ; |
| } |
| else if (!myFs.alias.equals(otherFs.alias)) { |
| return false ; |
| } |
| } |
| |
| if (myFs.type != otherFs.type) { |
| return false ; |
| } |
| |
| if (!relaxInner) { |
| // Compare recursively using field schema |
| if (!FieldSchema.equals(myFs, otherFs, false, relaxAlias)) { |
| return false ; |
| } |
| } |
| |
| } |
| return true; |
| } |
| |
| |
| /*** |
| * Merge this schema with the other schema |
| * @param other the other schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * schema take precedence |
| * @return the merged schema, null if they are not compatible |
| */ |
| public Schema merge(Schema other, boolean otherTakesAliasPrecedence) { |
| return mergeSchema(this, other, otherTakesAliasPrecedence) ; |
| } |
| |
| /*** |
| * Recursively merge two schemas |
| * @param schema the initial schema |
| * @param other the other schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * schema take precedence |
| * @return the merged schema, null if they are not compatible |
| */ |
| public static Schema mergeSchema(Schema schema, Schema other, |
| boolean otherTakesAliasPrecedence) { |
| try { |
| Schema newSchema = mergeSchema(schema, |
| other, |
| otherTakesAliasPrecedence, |
| false, |
| false) ; |
| return newSchema; |
| } |
| catch(SchemaMergeException sme) { |
| // just mean they are not compatible |
| } |
| return null ; |
| } |
| |
| /*** |
| * Recursively merge two schemas |
| * @param schema the initial schema |
| * @param other the other schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * schema take precedence |
| * @param allowDifferentSizeMerge allow merging of schemas of different types |
| * @param allowIncompatibleTypes 1) if types in schemas are not compatible |
| * they will be treated as ByteArray (untyped) |
| * 2) if schemas in schemas are not compatible |
| * and allowIncompatibleTypes is true |
| * those inner schemas in the output |
| * will be null. |
| * @return the merged schema this can be null if one schema is null and |
| * allowIncompatibleTypes is true |
| * |
| * @throws SchemaMergeException if they cannot be merged |
| */ |
| |
| public static Schema mergeSchema(Schema schema, |
| Schema other, |
| boolean otherTakesAliasPrecedence, |
| boolean allowDifferentSizeMerge, |
| boolean allowIncompatibleTypes) |
| throws SchemaMergeException { |
| if(schema == null && other == null){ |
| //if both are null, they are not incompatible |
| return null; |
| } |
| if (schema == null) { |
| if (allowIncompatibleTypes) { |
| return null ; |
| } |
| else { |
| int errCode = 1029; |
| String msg = "One of the schemas is null for merging schemas. Schema: " + schema + " Other schema: " + other; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; |
| } |
| } |
| |
| if (other == null) { |
| if (allowIncompatibleTypes) { |
| return null ; |
| } |
| else { |
| int errCode = 1029; |
| String msg = "One of the schemas is null for merging schemas. Schema: " + schema + " Other schema: " + other; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; |
| } |
| } |
| |
| if ( (schema.size() != other.size()) && |
| (!allowDifferentSizeMerge) ) { |
| int errCode = 1030; |
| String msg = "Different schema sizes for merging schemas. Schema size: " + schema.size() + " Other schema size: " + other.size(); |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; |
| } |
| |
| List<FieldSchema> outputList = new ArrayList<FieldSchema>() ; |
| |
| List<FieldSchema> mylist = schema.mFields ; |
| List<FieldSchema> otherlist = other.mFields ; |
| |
| // We iterate up to the smaller one's size |
| int iterateLimit = schema.mFields.size() > other.mFields.size()? |
| other.mFields.size() : schema.mFields.size() ; |
| |
| int idx = 0; |
| for (; idx< iterateLimit ; idx ++) { |
| |
| // Just for readability |
| FieldSchema myFs = mylist.get(idx) ; |
| FieldSchema otherFs = otherlist.get(idx) ; |
| |
| byte mergedType = DataType.mergeType(myFs.type, otherFs.type) ; |
| |
| // If the types cannot be merged |
| if (mergedType == DataType.ERROR) { |
| // If treatIncompatibleAsByteArray is true, |
| // we will treat it as bytearray |
| if (allowIncompatibleTypes) { |
| mergedType = DataType.BYTEARRAY ; |
| } |
| // otherwise the schemas cannot be merged |
| else { |
| int errCode = 1031; |
| String msg = "Incompatible types for merging schemas. Field schema type: " |
| + DataType.findTypeName(myFs.type) + " Other field schema type: " |
| + DataType.findTypeName(otherFs.type); |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; |
| } |
| } |
| |
| String mergedAlias = mergeAlias(myFs.alias, |
| otherFs.alias, |
| otherTakesAliasPrecedence) ; |
| |
| FieldSchema mergedFs = null ; |
| if (!DataType.isSchemaType(mergedType)) { |
| // just normal merge |
| mergedFs = new FieldSchema(mergedAlias, mergedType) ; |
| } |
| else { |
| // merge inner tuple because both sides are tuples |
| //if inner schema are incompatible and allowIncompatibleTypes==true |
| // an exception is thrown by mergeSchema |
| Schema mergedSubSchema = mergeSchema(myFs.schema, |
| otherFs.schema, |
| otherTakesAliasPrecedence, |
| allowDifferentSizeMerge, |
| allowIncompatibleTypes) ; |
| |
| // create the merged field |
| // the mergedSubSchema can be true if allowIncompatibleTypes |
| try { |
| mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ; |
| } catch (FrontendException e) { |
| int errCode = 2124; |
| String errMsg = "Internal Error: Unexpected error creating field schema"; |
| throw new SchemaMergeException(errMsg, errCode, PigException.BUG, e); |
| } |
| |
| } |
| outputList.add(mergedFs) ; |
| } |
| |
| // Handle different schema size |
| if (allowDifferentSizeMerge) { |
| |
| // if the first schema has leftover, then append the rest |
| for(int i=idx; i < mylist.size(); i++) { |
| |
| FieldSchema fs = mylist.get(i) ; |
| |
| // for non-schema types |
| if (!DataType.isSchemaType(fs.type)) { |
| outputList.add(new FieldSchema(fs.alias, fs.type)) ; |
| } |
| // for TUPLE & BAG |
| else { |
| FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ; |
| tmp.type = fs.type ; |
| outputList.add(tmp) ; |
| } |
| } |
| |
| // if the second schema has leftover, then append the rest |
| for(int i=idx; i < otherlist.size(); i++) { |
| |
| FieldSchema fs = otherlist.get(i) ; |
| |
| // for non-schema types |
| if (!DataType.isSchemaType(fs.type)) { |
| outputList.add(new FieldSchema(fs.alias, fs.type)) ; |
| } |
| // for TUPLE & BAG |
| else { |
| FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ; |
| tmp.type = fs.type ; |
| outputList.add(tmp) ; |
| } |
| } |
| |
| } |
| |
| Schema result = new Schema(outputList); |
| if (schema.isTwoLevelAccessRequired()!=other.isTwoLevelAccessRequired()) { |
| int errCode = 2124; |
| String errMsg = "Cannot merge schema " + schema + " and " + other + ". One with twoLeverAccess flag, the other doesn't."; |
| throw new SchemaMergeException(errMsg, errCode, PigException.BUG); |
| } |
| if (schema.isTwoLevelAccessRequired()) |
| result.setTwoLevelAccessRequired(true); |
| return result; |
| } |
| |
| /*** |
| * Merge two aliases. If one of aliases is null, return the other. |
| * Otherwise check the precedence condition |
| * @param alias |
| * @param other |
| * @param otherTakesPrecedence |
| * @return |
| */ |
| private static String mergeAlias(String alias, String other |
| ,boolean otherTakesPrecedence) { |
| if (alias == null) { |
| return other ; |
| } |
| else if (other == null) { |
| return alias ; |
| } |
| else if (otherTakesPrecedence) { |
| return other ; |
| } |
| else { |
| return alias ; |
| } |
| } |
| |
| /** |
| * Merges collection of schemas using their column aliases |
| * (unlike mergeSchema(..) functions which merge using positions) |
| * Schema will not be merged if types are incompatible, |
| * as per DataType.mergeType(..) |
| * For Tuples and Bags, SubSchemas have to be equal be considered compatible |
| * @param schemas - list of schemas to be merged using their column alias |
| * @return merged schema |
| * @throws SchemaMergeException |
| */ |
| public static Schema mergeSchemasByAlias(Collection<Schema> schemas) |
| throws SchemaMergeException{ |
| Schema mergedSchema = null; |
| |
| // list of schemas that have currently been merged, used in error message |
| ArrayList<Schema> mergedSchemas = new ArrayList<Schema>(schemas.size()); |
| for(Schema sch : schemas){ |
| if(mergedSchema == null){ |
| mergedSchema = new Schema(sch); |
| mergedSchemas.add(sch); |
| continue; |
| } |
| try{ |
| mergedSchema = mergeSchemaByAlias(mergedSchema, sch); |
| mergedSchemas.add(sch); |
| }catch(SchemaMergeException e){ |
| String msg = "Error merging schema: (" + sch + ") with " |
| + "merged schema: (" + mergedSchema + ")" + " of schemas : " |
| + mergedSchemas; |
| SchemaMergeException sme = new SchemaMergeException(msg, |
| e.getErrorCode(), e); |
| sme.setMarkedAsShowToUser(true); |
| throw sme; |
| } |
| } |
| return mergedSchema; |
| } |
| |
| /** |
| * Merges two schemas using their column aliases |
| * (unlike mergeSchema(..) functions which merge using positions) |
| * Schema will not be merged if types are incompatible, |
| * as per DataType.mergeType(..) |
| * For Tuples and Bags, SubSchemas have to be equal be considered compatible |
| * @param schema1 |
| * @param schema2 |
| * @return Merged Schema |
| * @throws SchemaMergeException if schemas cannot be merged |
| */ |
| public static Schema mergeSchemaByAlias(Schema schema1, |
| Schema schema2) |
| throws SchemaMergeException{ |
| Schema mergedSchema = new Schema(); |
| HashSet<FieldSchema> schema2colsAdded = new HashSet<FieldSchema>(); |
| // add/merge fields present in first schema |
| for(FieldSchema fs1 : schema1.getFields()){ |
| checkNullAlias(fs1, schema1); |
| FieldSchema fs2 = getFieldSubNameMatchThrowSchemaMergeException(schema2,fs1.alias); |
| if(fs2 != null){ |
| if(schema2colsAdded.contains(fs2)){ |
| // alias corresponds to multiple fields in schema1, |
| // just do a lookup on |
| // schema1 , that will throw the appropriate error. |
| getFieldSubNameMatchThrowSchemaMergeException(schema1, fs2.alias); |
| } |
| schema2colsAdded.add(fs2); |
| } |
| FieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2); |
| mergedSchema.add(mergedFs); |
| } |
| |
| //add schemas from 2nd schema, that are not already present in |
| // merged schema |
| for(FieldSchema fs2 : schema2.getFields()){ |
| checkNullAlias(fs2, schema2); |
| if(! schema2colsAdded.contains(fs2)){ |
| try { |
| mergedSchema.add(fs2.clone()); |
| } catch (CloneNotSupportedException e) { |
| throw new SchemaMergeException( |
| "Error encountered while merging schemas", e); |
| } |
| } |
| } |
| return mergedSchema; |
| |
| } |
| |
| private static void checkNullAlias(FieldSchema fs, Schema schema) |
| throws SchemaMergeException { |
| if(fs.alias == null){ |
| throw new SchemaMergeException( |
| "Schema having field with null alias cannot be merged " + |
| "using alias. Schema :" + schema, |
| 1126 |
| ); |
| } |
| } |
| |
| /** |
| * Schema will not be merged if types are incompatible, |
| * as per DataType.mergeType(..) |
| * For Tuples and Bags, SubSchemas have to be equal be considered compatible |
| * Aliases are assumed to be same for both |
| * @param fs1 |
| * @param fs2 |
| * @return |
| * @throws SchemaMergeException |
| */ |
| private static FieldSchema mergeFieldSchemaFirstLevelSameAlias(FieldSchema fs1, |
| FieldSchema fs2) |
| throws SchemaMergeException { |
| if(fs1 == null) |
| return fs2; |
| if(fs2 == null) |
| return fs1; |
| |
| Schema innerSchema = null; |
| |
| String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias); |
| |
| byte mergedType = DataType.mergeType(fs1.type, fs2.type) ; |
| |
| // If the types cannot be merged |
| if (mergedType == DataType.ERROR) { |
| int errCode = 1031; |
| String msg = "Incompatible types for merging schemas. Field schema: " |
| + fs1 + " Other field schema: " + fs2; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; |
| } |
| if(DataType.isSchemaType(mergedType)) { |
| // if one of them is a bytearray, pick inner schema of other one |
| if( fs1.type == DataType.BYTEARRAY ){ |
| innerSchema = fs2.schema; |
| }else if(fs2.type == DataType.BYTEARRAY){ |
| innerSchema = fs1.schema; |
| } |
| else { |
| //in case of types with inner schema such as bags and tuples |
| // the inner schema has to be same |
| if(!equals(fs1.schema, fs2.schema, false, false)){ |
| int errCode = 1032; |
| String msg = "Incompatible types for merging inner schemas of " + |
| " Field schema type: " + fs1 + " Other field schema type: " + fs2; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; |
| } |
| innerSchema = fs1.schema; |
| } |
| } |
| try { |
| return new FieldSchema(alias, innerSchema, mergedType) ; |
| } catch (FrontendException e) { |
| // this exception is not expected |
| int errCode = 2124; |
| throw new SchemaMergeException( |
| "Error in creating fieldSchema", |
| errCode, |
| PigException.BUG |
| ); |
| } |
| } |
| |
| |
| /** |
| * If one of the aliases is of form 'nm::str1', and other is of the form |
| * 'str1', this returns str1 |
| * @param alias1 |
| * @param alias2 |
| * @return merged alias |
| * @throws SchemaMergeException |
| */ |
| private static String mergeNameSpacedAlias(String alias1, String alias2) |
| throws SchemaMergeException { |
| if(alias1.equals(alias2)){ |
| return alias1; |
| } |
| if(alias1.endsWith("::" + alias2)){ |
| return alias2; |
| } |
| if(alias2.endsWith("::" + alias1)){ |
| return alias1; |
| } |
| //the aliases are different, alias cannot be merged |
| return null; |
| } |
| |
| /** |
| * Utility function that calls schema.getFiled(alias), and converts |
| * {@link FrontendException} to {@link SchemaMergeException} |
| * @param schema |
| * @param alias |
| * @return FieldSchema |
| * @throws SchemaMergeException |
| */ |
| private static FieldSchema getFieldSubNameMatchThrowSchemaMergeException( |
| Schema schema, String alias) throws SchemaMergeException { |
| FieldSchema fs = null; |
| try { |
| fs = schema.getFieldSubNameMatch(alias); |
| } catch (FrontendException e) { |
| String msg = "Caught exception finding FieldSchema for alias " + |
| alias; |
| throw new SchemaMergeException(msg, e.getErrorCode(), e); |
| } |
| return fs; |
| } |
| |
| |
| |
| /** |
| * |
| * @param topLevelType DataType type of the top level element |
| * @param innerTypes DataType types of the inner level element |
| * @return nested schema representing type of top level element at first level and inner schema |
| * representing types of inner element(s) |
| */ |
| public static Schema generateNestedSchema(byte topLevelType, byte... innerTypes) throws FrontendException{ |
| |
| Schema innerSchema = new Schema(); |
| for (int i = 0; i < innerTypes.length; i++) { |
| innerSchema.add(new Schema.FieldSchema(null, innerTypes[i])); |
| } |
| |
| Schema.FieldSchema outerSchema = new Schema.FieldSchema(null, innerSchema, topLevelType); |
| return new Schema(outerSchema); |
| } |
| |
| /*** |
| * Recursively prefix merge two schemas |
| * @param other the other schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * schema take precedence |
| * @return the prefix merged schema this can be null if one schema is null and |
| * allowIncompatibleTypes is true |
| * |
| * @throws SchemaMergeException if they cannot be merged |
| */ |
| |
| public Schema mergePrefixSchema(Schema other, |
| boolean otherTakesAliasPrecedence) |
| throws SchemaMergeException { |
| return mergePrefixSchema(other, otherTakesAliasPrecedence, false); |
| } |
| |
| /*** |
| * Recursively prefix merge two schemas |
| * @param other the other schema to be merged with |
| * @param otherTakesAliasPrecedence true if aliases from the other |
| * schema take precedence |
| * @param allowMergeableTypes true if "mergeable" types should be allowed. |
| * Two types are mergeable if any of the following conditions is true IN THE |
| * BELOW ORDER of checks: |
| * 1) if either one has a type null or unknown and other has a type OTHER THAN |
| * null or unknown, the result type will be the latter non null/unknown type |
| * 2) If either type is bytearray, then result type will be the other (possibly non BYTEARRAY) type |
| * 3) If current type can be cast to the other type, then the result type will be the |
| * other type |
| * @return the prefix merged schema this can be null if one schema is null and |
| * allowIncompatibleTypes is true |
| * |
| * @throws SchemaMergeException if they cannot be merged |
| */ |
| |
| public Schema mergePrefixSchema(Schema other, |
| boolean otherTakesAliasPrecedence, boolean allowMergeableTypes) |
| throws SchemaMergeException { |
| Schema schema = this; |
| |
| if (other == null) { |
| return this ; |
| } |
| |
| if (schema.size() < other.size()) { |
| int errCode = 1033; |
| String msg = "Schema size mismatch for merging schemas. Other schema size greater than schema size. Schema: " + this + ". Other schema: " + other; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<FieldSchema> outputList = new ArrayList<FieldSchema>() ; |
| |
| List<FieldSchema> mylist = schema.mFields ; |
| List<FieldSchema> otherlist = other.mFields ; |
| |
| // We iterate up to the smaller one's size |
| int iterateLimit = other.mFields.size(); |
| |
| int idx = 0; |
| for (; idx< iterateLimit ; idx ++) { |
| |
| // Just for readability |
| FieldSchema myFs = mylist.get(idx) ; |
| FieldSchema otherFs = otherlist.get(idx) ; |
| |
| FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence, allowMergeableTypes); |
| outputList.add(mergedFs) ; |
| } |
| // if the first schema has leftover, then append the rest |
| for(int i=idx; i < mylist.size(); i++) { |
| |
| FieldSchema fs = mylist.get(i) ; |
| |
| // for non-schema types |
| if (!DataType.isSchemaType(fs.type)) { |
| outputList.add(new FieldSchema(fs.alias, fs.type)) ; |
| } |
| // for TUPLE & BAG |
| else { |
| try { |
| FieldSchema tmp = new FieldSchema(fs.alias, fs.schema, fs.type) ; |
| outputList.add(tmp) ; |
| } catch (FrontendException fee) { |
| int errCode = 1023; |
| String msg = "Unable to create field schema."; |
| throw new SchemaMergeException(msg, errCode, PigException.INPUT, fee); |
| } |
| } |
| } |
| |
| Schema s = new Schema(outputList) ; |
| s.setTwoLevelAccessRequired(other.twoLevelAccessRequired); |
| return s; |
| } |
| |
| /** |
| * Recursively set NULL type to the specifid type in a schema |
| * @param s the schema whose NULL type has to be set |
| * @param t the specified type |
| */ |
| public static void setSchemaDefaultType(Schema s, byte t) { |
| if(null == s) return; |
| for(Schema.FieldSchema fs: s.getFields()) { |
| FieldSchema.setFieldSchemaDefaultType(fs, t); |
| } |
| } |
| |
| /** |
| * @return the twoLevelAccess |
| * @deprecated twoLevelAccess is no longer needed |
| */ |
| @Deprecated |
| public boolean isTwoLevelAccessRequired() { |
| return twoLevelAccessRequired; |
| } |
| |
| /** |
| * @param twoLevelAccess the twoLevelAccess to set |
| * @deprecated twoLevelAccess is no longer needed |
| */ |
| @Deprecated |
| public void setTwoLevelAccessRequired(boolean twoLevelAccess) { |
| this.twoLevelAccessRequired = twoLevelAccess; |
| } |
| |
| public static Schema getPigSchema(ResourceSchema rSchema) |
| throws FrontendException { |
| if(rSchema == null) { |
| return null; |
| } |
| List<FieldSchema> fsList = new ArrayList<FieldSchema>(); |
| for(ResourceFieldSchema rfs : rSchema.getFields()) { |
| FieldSchema fs = new FieldSchema(rfs.getName(), |
| rfs.getSchema() == null ? |
| null : getPigSchema(rfs.getSchema()), rfs.getType()); |
| |
| if(rfs.getType() == DataType.BAG) { |
| if (fs.schema != null) { // allow partial schema |
| if (fs.schema.size() == 1) { |
| FieldSchema innerFs = fs.schema.getField(0); |
| if (innerFs.type != DataType.TUPLE) { |
| ResourceFieldSchema.throwInvalidSchemaException(); |
| } |
| } else { |
| ResourceFieldSchema.throwInvalidSchemaException(); |
| } |
| } |
| } |
| fsList.add(fs); |
| } |
| return new Schema(fsList); |
| } |
| |
| /** |
| * Look for a FieldSchema instance in the schema hierarchy which has the given canonical name. |
| * @param canonicalName canonical name |
| * @return the FieldSchema instance found |
| */ |
| public FieldSchema findFieldSchema(String canonicalName) { |
| for( FieldSchema fs : mFields ) { |
| if( fs.canonicalName.equals( canonicalName ) ) |
| return fs; |
| if( fs.schema != null ) { |
| FieldSchema result = fs.schema.findFieldSchema( canonicalName ); |
| if( result != null ) |
| return result; |
| } |
| } |
| return null; |
| } |
| |
| } |
| |
| |
| |