| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.pig.piggybank.storage.avro; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import org.apache.avro.Schema; |
| import org.apache.avro.Schema.Field; |
| import org.apache.avro.generic.GenericDatumReader; |
| import org.apache.avro.file.DataFileStream; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.pig.LoadFunc; |
| import org.apache.pig.ResourceSchema; |
| import org.apache.pig.ResourceSchema.ResourceFieldSchema; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.impl.util.Utils; |
| import org.codehaus.jackson.JsonNode; |
| /** |
| * This is utility class for this package |
| */ |
| public class AvroStorageUtils { |
| |
| public static Schema BooleanSchema = Schema.create(Schema.Type.BOOLEAN); |
| public static Schema LongSchema = Schema.create(Schema.Type.LONG); |
| public static Schema FloatSchema = Schema.create(Schema.Type.FLOAT); |
| public static Schema DoubleSchema = Schema.create(Schema.Type.DOUBLE); |
| public static Schema IntSchema = Schema.create(Schema.Type.INT); |
| public static Schema StringSchema = Schema.create(Schema.Type.STRING); |
| public static Schema BytesSchema = Schema.create(Schema.Type.BYTES); |
| public static Schema NullSchema = Schema.create(Schema.Type.NULL); |
| |
| private static final String NONAME = "NONAME"; |
| private static final String PIG_TUPLE_WRAPPER = "PIG_WRAPPER"; |
| |
| static String getDummyFieldName(int index) { |
| return NONAME + "_" + index; |
| } |
| |
| /** create an avro field using the given schema */ |
| public static Field createUDField(int index, Schema s) { |
| return new Field(getDummyFieldName(index), s, null, null); |
| } |
| |
| /** create an avro field with null schema (it is a space holder) */ |
| public static Schema createUDPartialRecordSchema() { |
| return Schema.createRecord(NONAME, null, null, false); |
| } |
| |
| /** check whether a schema is a space holder (using field name) */ |
| public static boolean isUDPartialRecordSchema(Schema s) { |
| return s.getName().equals(NONAME); |
| } |
| |
| /** get field schema given index number */ |
| public static Field getUDField(Schema s, int index) { |
| return s.getField(getDummyFieldName(index)); |
| } |
| |
| /** |
| * Returns all non-hidden files recursively inside the base paths given |
| * |
| * @throws IOException |
| */ |
| public static Set<Path> getAllFilesRecursively(Set<Path> basePaths, Configuration conf) throws IOException { |
| Set<Path> paths = new HashSet<Path>(); |
| for (Path path : basePaths) { |
| FileSystem fs = FileSystem.get(path.toUri(), conf); |
| FileStatus f = fs.getFileStatus(path); |
| if (f.isDir()) { |
| getAllFilesInternal(f, conf, paths, fs); |
| } else { |
| paths.add(path); |
| } |
| } |
| return paths; |
| } |
| |
| private static void getAllFilesInternal(FileStatus file, Configuration conf, |
| Set<Path> paths, FileSystem fs) throws IOException { |
| for (FileStatus f : fs.listStatus(file.getPath(), Utils.VISIBLE_FILES)) { |
| if (f.isDir()) { |
| getAllFilesInternal(f, conf, paths, fs); |
| } else { |
| paths.add(f.getPath()); |
| } |
| } |
| } |
| |
| /** check whether there is NO directory in the input file (status) list*/ |
| public static boolean noDir(FileStatus [] ss) { |
| for (FileStatus s : ss) { |
| if (s.isDir()) |
| return false; |
| } |
| return true; |
| } |
| |
| /** get last file of a hdfs path if it is a directory; |
| * or return the file itself if path is a file |
| */ |
| public static Path getLast(Path path, FileSystem fs) throws IOException { |
| |
| FileStatus status = fs.getFileStatus(path); |
| if (!status.isDir()) { |
| return path; |
| } |
| FileStatus[] statuses = fs.listStatus(path, Utils.VISIBLE_FILES); |
| |
| if (statuses.length == 0) { |
| return null; |
| } else { |
| Arrays.sort(statuses); |
| for (int i = statuses.length - 1; i >= 0; i--) { |
| if (!statuses[i].isDir()) { |
| return statuses[i].getPath(); |
| } |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * This method merges two primitive avro types into one. This method must |
| * be used only to merge two primitive types. For complex types, null will |
| * be returned unless they are both the same type. Also note that not every |
| * primitive type can be merged. For types that cannot be merged, null is |
| * returned. |
| * |
| * @param x first avro type to merge |
| * @param y second avro type to merge |
| * @return merged avro type |
| */ |
| private static Schema.Type mergeType(Schema.Type x, Schema.Type y) { |
| if (x.equals(y)) { |
| return x; |
| } |
| |
| switch(x) { |
| case INT: |
| switch (y) { |
| case LONG: |
| return Schema.Type.LONG; |
| case FLOAT: |
| return Schema.Type.FLOAT; |
| case DOUBLE: |
| return Schema.Type.DOUBLE; |
| case ENUM: |
| case STRING: |
| return Schema.Type.STRING; |
| } |
| |
| case LONG: |
| switch (y) { |
| case INT: |
| return Schema.Type.LONG; |
| case FLOAT: |
| return Schema.Type.FLOAT; |
| case DOUBLE: |
| return Schema.Type.DOUBLE; |
| case ENUM: |
| case STRING: |
| return Schema.Type.STRING; |
| } |
| |
| case FLOAT: |
| switch (y) { |
| case INT: |
| case LONG: |
| return Schema.Type.FLOAT; |
| case DOUBLE: |
| return Schema.Type.DOUBLE; |
| case ENUM: |
| case STRING: |
| return Schema.Type.STRING; |
| } |
| |
| case DOUBLE: |
| switch (y) { |
| case INT: |
| case LONG: |
| case FLOAT: |
| return Schema.Type.DOUBLE; |
| case ENUM: |
| case STRING: |
| return Schema.Type.STRING; |
| } |
| |
| case ENUM: |
| switch (y) { |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| case STRING: |
| return Schema.Type.STRING; |
| } |
| |
| case STRING: |
| switch (y) { |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| case ENUM: |
| return Schema.Type.STRING; |
| } |
| } |
| |
| // else return just null in particular, bytes and boolean |
| return null; |
| } |
| |
| /** |
| * This method merges two avro schemas into one. Note that not every avro schema |
| * can be merged. For complex types to be merged, they must be the same type. |
| * For primitive types to be merged, they must meet certain conditions. For |
| * schemas that cannot be merged, an exception is thrown. |
| * |
| * @param x first avro schema to merge |
| * @param y second avro schema to merge |
| * @return merged avro schema |
| * @throws IOException |
| */ |
| public static Schema mergeSchema(Schema x, Schema y) throws IOException { |
| if (x == null) { |
| return y; |
| } |
| if (y == null) { |
| return x; |
| } |
| if (x.equals(y)) { |
| return x; |
| } |
| |
| Schema.Type xType = x.getType(); |
| Schema.Type yType = y.getType(); |
| |
| switch (xType) { |
| case RECORD: |
| if (!yType.equals(Schema.Type.RECORD)) { |
| throw new IOException("Cannot merge " + xType + " with " + yType); |
| } |
| |
| List<Schema.Field> xFields = x.getFields(); |
| List<Schema.Field> yFields = y.getFields(); |
| |
| // LinkedHashMap is used to keep fields in insertion order. |
| // It's convenient for testing to have deterministic behaviors. |
| Map<String, Schema> fieldName2Schema = |
| new LinkedHashMap<String, Schema>(xFields.size() + yFields.size()); |
| Map<String, JsonNode> fieldName2Default = |
| new LinkedHashMap<String, JsonNode>(xFields.size() + yFields.size()); |
| |
| for (Schema.Field xField : xFields) { |
| fieldName2Schema.put(xField.name(), xField.schema()); |
| fieldName2Default.put(xField.name(),xField.defaultValue()); |
| } |
| for (Schema.Field yField : yFields) { |
| String name = yField.name(); |
| Schema currSchema = yField.schema(); |
| Schema prevSchema = fieldName2Schema.get(name); |
| if (prevSchema == null) { |
| fieldName2Schema.put(name, currSchema); |
| fieldName2Default.put(name, yField.defaultValue()); |
| } else { |
| fieldName2Schema.put(name, mergeSchema(prevSchema, currSchema)); |
| //during merging of schemas for records it to okay to have one field with a default |
| // and another null so the one with the default will be considered |
| JsonNode xDefaultValue = fieldName2Default.get(name); |
| JsonNode yDefaultValue = yField.defaultValue(); |
| if (xDefaultValue != null) { |
| // need to check if the default values in the schemas are the same |
| if (yDefaultValue != null && !xDefaultValue.equals(yDefaultValue)) { |
| throw new IOException( |
| "Cannot merge schema's which have different default values - " + xDefaultValue + |
| " and " + yDefaultValue); |
| } |
| } else { |
| fieldName2Default.put(name, yDefaultValue); |
| } |
| } |
| } |
| List<Schema.Field> mergedFields = new ArrayList<Schema.Field>(fieldName2Schema.size()); |
| for (Entry<String, Schema> entry : fieldName2Schema.entrySet()) { |
| mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", fieldName2Default.get(entry.getKey()))); |
| } |
| Schema result = Schema.createRecord( |
| "merged", null, "merged schema (generated by AvroStorage)", false); |
| result.setFields(mergedFields); |
| return result; |
| |
| case ARRAY: |
| if (!yType.equals(Schema.Type.ARRAY)) { |
| throw new IOException("Cannot merge " + xType + " with " + yType); |
| } |
| return Schema.createArray(mergeSchema(x.getElementType(), y.getElementType())); |
| |
| case MAP: |
| if (!yType.equals(Schema.Type.MAP)) { |
| throw new IOException("Cannot merge " + xType + " with " + yType); |
| } |
| return Schema.createMap(mergeSchema(x.getValueType(), y.getValueType())); |
| |
| case UNION: |
| if (!yType.equals(Schema.Type.UNION)) { |
| throw new IOException("Cannot merge " + xType + " with " + yType); |
| } |
| List<Schema> xTypes = x.getTypes(); |
| List<Schema> yTypes = y.getTypes(); |
| |
| List<Schema> unionTypes = new ArrayList<Schema>(); |
| for (Schema xSchema : xTypes) { |
| unionTypes.add(xSchema); |
| } |
| for (Schema ySchema : yTypes) { |
| if (!unionTypes.contains(ySchema)) { |
| unionTypes.add(ySchema); |
| } |
| } |
| return Schema.createUnion(unionTypes); |
| |
| case FIXED: |
| if (!yType.equals(Schema.Type.FIXED)) { |
| throw new IOException("Cannot merge " + xType + " with " + yType); |
| } |
| int xSize = x.getFixedSize(); |
| int ySize = y.getFixedSize(); |
| if (xSize != ySize) { |
| throw new IOException("Cannot merge FIXED types with different sizes: " + xSize + " and " + ySize); |
| } |
| return Schema.createFixed("merged", null, "merged schema (generated by AvroStorage)", xSize); |
| |
| default: // primitive types |
| Schema.Type mergedType = mergeType(xType ,yType); |
| if (mergedType == null) { |
| throw new IOException("Cannot merge " + xType + " with " + yType); |
| } |
| return Schema.create(mergedType); |
| } |
| } |
| |
| /** |
| * When merging multiple avro record schemas, we build a map (schemaToMergedSchemaMap) |
| * to associate each input record with a remapping of its fields relative to the merged |
| * schema. Take the following two schemas for example: |
| * |
| * // path1 |
| * { "type": "record", |
| * "name": "x", |
| * "fields": [ { "name": "xField", "type": "string" } ] |
| * } |
| * |
| * // path2 |
| * { "type": "record", |
| * "name": "y", |
| * "fields": [ { "name": "yField", "type": "string" } ] |
| * } |
| * |
| * The merged schema will be something like this: |
| * |
| * // merged |
| * { "type": "record", |
| * "name": "merged", |
| * "fields": [ { "name": "xField", "type": "string" }, |
| * { "name": "yField", "type": "string" } ] |
| * } |
| * |
| * The schemaToMergedSchemaMap will look like this: |
| * |
| * // schemaToMergedSchemaMap |
| * { path1 : { 0 : 0 }, |
| * path2 : { 0 : 1 } |
| * } |
| * |
| * The meaning of the map is: |
| * - The field at index '0' of 'path1' is moved to index '0' in merged schema. |
| * - The field at index '0' of 'path2' is moved to index '1' in merged schema. |
| * |
| * With this map, we can now remap the field position of the original schema to |
| * that of the merged schema. This is necessary because in the backend, we don't |
| * use the merged avro schema but embedded avro schemas of input files to load |
| * them. Therefore, we must relocate each field from old positions in the original |
| * schema to new positions in the merged schema. |
| * |
| * @param mergedSchema new schema generated from multiple input schemas |
| * @param mergedFiles input avro files that are merged |
| * @return schemaToMergedSchemaMap that maps old position of each field in the |
| * original schema to new position in the new schema |
| * @throws IOException |
| */ |
| public static Map<Path, Map<Integer, Integer>> getSchemaToMergedSchemaMap( |
| Schema mergedSchema, Map<Path, Schema> mergedFiles) throws IOException { |
| |
| if (!mergedSchema.getType().equals(Schema.Type.RECORD)) { |
| throw new IOException("Remapping of non-record schemas is not supported"); |
| } |
| |
| Map<Path, Map<Integer, Integer>> result = |
| new HashMap<Path, Map<Integer, Integer>>(mergedFiles.size()); |
| |
| // map from field position in old schema to field position in new schema |
| for (Map.Entry<Path, Schema> entry : mergedFiles.entrySet()) { |
| Path path = entry.getKey(); |
| Schema schema = entry.getValue(); |
| if (!schema.getType().equals(Schema.Type.RECORD)) { |
| throw new IOException("Remapping of non-record schemas is not supported"); |
| } |
| List<Field> fields = schema.getFields(); |
| Map<Integer, Integer> oldPos2NewPos = result.get(path); |
| if (oldPos2NewPos == null) { |
| oldPos2NewPos = new HashMap<Integer, Integer>(fields.size()); |
| result.put(path, oldPos2NewPos); |
| } |
| for (Field field : fields) { |
| String fieldName = field.name(); |
| int oldPos = schema.getField(fieldName).pos(); |
| int newPos = mergedSchema.getField(fieldName).pos(); |
| oldPos2NewPos.put(oldPos, newPos); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Wrap an avro schema as a nullable union if needed. |
| * For instance, wrap schema "int" as ["null", "int"] |
| */ |
| public static Schema wrapAsUnion(Schema schema, boolean nullable) { |
| if (nullable) { |
| /* if schema is an acceptable union, then return itself */ |
| if (schema.getType().equals(Schema.Type.UNION) |
| && isAcceptableUnion(schema)) |
| return schema; |
| else |
| return Schema.createUnion(Arrays.asList(NullSchema, schema)); |
| } else |
| /*do not wrap it if not */ |
| return schema; |
| } |
| |
| /** determine whether the input schema contains recursive records */ |
| public static boolean containsRecursiveRecord(Schema s) { |
| /*initialize empty set of defined record names*/ |
| Set<String> set = new HashSet<String> (); |
| return containsRecursiveRecord(s, set); |
| } |
| |
| /** |
| * Called by {@link #containsRecursiveRecord(Schema)} and it recursively checks |
| * whether the input schema contains recursive records. |
| */ |
| protected static boolean containsRecursiveRecord(Schema s, Set<String> definedRecordNames) { |
| |
| /* if it is a record, check itself and all fields*/ |
| if (s.getType().equals(Schema.Type.RECORD)) { |
| String name = s.getName(); |
| if (definedRecordNames.contains(name)) return true; |
| |
| /* add its own name into defined record set*/ |
| definedRecordNames.add(s.getName()); |
| |
| /* check all fields */ |
| List<Field> fields = s.getFields(); |
| for (Field field: fields) { |
| Schema fs = field.schema(); |
| if (containsRecursiveRecord(fs, definedRecordNames)) |
| return true; |
| } |
| |
| /* remove its own name from the name set */ |
| definedRecordNames.remove(s.getName()); |
| |
| return false; |
| } |
| |
| /* if it is an array, check its element type */ |
| else if (s.getType().equals(Schema.Type.ARRAY)) { |
| Schema fs = s.getElementType(); |
| return containsRecursiveRecord(fs, definedRecordNames); |
| } |
| |
| /*if it is a map, check its value type */ |
| else if (s.getType().equals(Schema.Type.MAP)) { |
| Schema vs = s.getValueType(); |
| return containsRecursiveRecord(vs, definedRecordNames); |
| } |
| |
| /* if it is a union, check all possible types */ |
| else if (s.getType().equals(Schema.Type.UNION)) { |
| List<Schema> types = s.getTypes(); |
| for (Schema type: types) { |
| if (containsRecursiveRecord(type, definedRecordNames)) |
| return true; |
| } |
| return false; |
| } |
| |
| /* return false for other cases */ |
| else { |
| return false; |
| } |
| } |
| |
| /** determine whether the input schema contains generic unions */ |
| public static boolean containsGenericUnion(Schema s) { |
| /* initialize empty set of visited records */ |
| Set<Schema> set = new HashSet<Schema> (); |
| return containsGenericUnion(s, set); |
| } |
| |
| /** |
| * Called by {@link #containsGenericUnion(Schema)} and it recursively checks |
| * whether the input schema contains generic unions. |
| */ |
| protected static boolean containsGenericUnion(Schema s, Set<Schema> visitedRecords) { |
| |
| /* if it is a record, check all fields*/ |
| if (s.getType().equals(Schema.Type.RECORD)) { |
| |
| /* add its own name into visited record set*/ |
| visitedRecords.add(s); |
| |
| /* check all fields */ |
| List<Field> fields = s.getFields(); |
| for (Field field: fields) { |
| Schema fs = field.schema(); |
| if (!visitedRecords.contains(fs)) { |
| if (containsGenericUnion(fs, visitedRecords)) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /* if it is an array, check its element type */ |
| else if (s.getType().equals(Schema.Type.ARRAY)) { |
| Schema fs = s.getElementType(); |
| if (!visitedRecords.contains(fs)) { |
| return containsGenericUnion(fs, visitedRecords); |
| } |
| return false; |
| } |
| |
| /*if it is a map, check its value type */ |
| else if (s.getType().equals(Schema.Type.MAP)) { |
| Schema vs = s.getValueType(); |
| if (!visitedRecords.contains(vs)) { |
| return containsGenericUnion(vs, visitedRecords); |
| } |
| return false; |
| } |
| |
| /* if it is a union, check all possible types and itself */ |
| else if (s.getType().equals(Schema.Type.UNION)) { |
| List<Schema> types = s.getTypes(); |
| for (Schema type: types) { |
| if (!visitedRecords.contains(type)) { |
| if (containsGenericUnion(type, visitedRecords)) { |
| return true; |
| } |
| } |
| } |
| /* check whether itself is acceptable (null-union) */ |
| return !isAcceptableUnion(s); |
| } |
| |
| /* return false for other cases */ |
| else { |
| return false; |
| } |
| } |
| |
| /** determine whether a union is a nullable union; |
| * note that this function doesn't check containing |
| * types of the input union recursively. */ |
| public static boolean isAcceptableUnion(Schema in) { |
| if (! in.getType().equals(Schema.Type.UNION)) |
| return false; |
| |
| List<Schema> types = in.getTypes(); |
| if (types.size() <= 1) { |
| return true; |
| } else if (types.size() > 2) { |
| return false; /*contains more than 2 types */ |
| } else { |
| /* one of two types is NULL */ |
| return types.get(0).getType().equals(Schema.Type.NULL) || types.get(1) .getType().equals(Schema.Type.NULL); |
| } |
| } |
| |
| /** wrap a pig schema as tuple */ |
| public static ResourceFieldSchema wrapAsTuple(ResourceFieldSchema subFieldSchema) throws IOException { |
| ResourceSchema listSchema = new ResourceSchema(); |
| listSchema.setFields(new ResourceFieldSchema[] { subFieldSchema }); |
| |
| ResourceFieldSchema tupleWrapper = new ResourceFieldSchema(); |
| tupleWrapper.setType(DataType.TUPLE); |
| tupleWrapper.setName(PIG_TUPLE_WRAPPER); |
| tupleWrapper.setSchema(listSchema); |
| |
| return tupleWrapper; |
| } |
| |
| /** check whether it is just a wrapped tuple */ |
| public static boolean isTupleWrapper(ResourceFieldSchema pigSchema) { |
| Boolean status = false; |
| if(pigSchema.getType() == DataType.TUPLE) |
| if(pigSchema.getName() != null) |
| if(pigSchema.getName().equals(AvroStorageUtils.PIG_TUPLE_WRAPPER)) |
| status = true; |
| return status; |
| } |
| |
| /** extract schema from a nullable union */ |
| public static Schema getAcceptedType(Schema in) { |
| if (!isAcceptableUnion(in)) |
| throw new RuntimeException("Cannot call this function on a unacceptable union"); |
| |
| List<Schema> types = in.getTypes(); |
| switch (types.size()) { |
| case 0: |
| return null; /*union with no type*/ |
| case 1: |
| return types.get(0); /*union with one type*/ |
| case 2: |
| return (types.get(0).getType().equals(Schema.Type.NULL)) |
| ? types.get(1) |
| : types.get(0); |
| default: |
| return null; |
| } |
| } |
| |
| /** |
| * This method is called by {@link #getAvroSchema}. The default implementation |
| * returns the schema of an avro file; or the schema of the last file in a first-level |
| * directory (it does not contain sub-directories). |
| * |
| * @param path path of a file or first level directory |
| * @param fs file system |
| * @return avro schema |
| * @throws IOException |
| */ |
| public static Schema getSchema(Path path, FileSystem fs) throws IOException { |
| /* get path of the last file */ |
| Path lastFile = AvroStorageUtils.getLast(path, fs); |
| if (lastFile == null) { |
| return null; |
| } |
| |
| /* read in file and obtain schema */ |
| GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>(); |
| InputStream hdfsInputStream = fs.open(lastFile); |
| DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader); |
| Schema ret = avroDataStream.getSchema(); |
| avroDataStream.close(); |
| |
| return ret; |
| } |
| |
| } |