| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.avro; |
| |
| import it.unimi.dsi.fastutil.booleans.BooleanArrayList; |
| import it.unimi.dsi.fastutil.bytes.ByteArrayList; |
| import it.unimi.dsi.fastutil.chars.CharArrayList; |
| import it.unimi.dsi.fastutil.doubles.DoubleArrayList; |
| import it.unimi.dsi.fastutil.floats.FloatArrayList; |
| import it.unimi.dsi.fastutil.ints.IntArrayList; |
| import it.unimi.dsi.fastutil.longs.LongArrayList; |
| import it.unimi.dsi.fastutil.shorts.ShortArrayList; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Modifier; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.LinkedHashMap; |
| import org.apache.avro.AvroTypeException; |
| import org.apache.avro.Conversion; |
| import org.apache.avro.LogicalType; |
| import org.apache.avro.Schema; |
| import org.apache.avro.SchemaCompatibility; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.reflect.AvroIgnore; |
| import org.apache.avro.reflect.AvroName; |
| import org.apache.avro.reflect.AvroSchema; |
| import org.apache.avro.reflect.ReflectData; |
| import org.apache.avro.reflect.Stringable; |
| import org.apache.avro.specific.SpecificData; |
| import org.apache.avro.util.ClassUtils; |
| import org.apache.parquet.Preconditions; |
| import org.apache.parquet.avro.AvroConverters.FieldStringConverter; |
| import org.apache.parquet.avro.AvroConverters.FieldStringableConverter; |
| import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator; |
| import org.apache.parquet.io.InvalidRecordException; |
| import org.apache.parquet.io.api.Converter; |
| import org.apache.parquet.io.api.GroupConverter; |
| import org.apache.parquet.schema.GroupType; |
| import org.apache.parquet.schema.MessageType; |
| import org.apache.parquet.schema.Type; |
| |
| import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; |
| import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility; |
| import static org.apache.parquet.schema.Type.Repetition.REPEATED; |
| import static org.apache.parquet.schema.Type.Repetition.REQUIRED; |
| |
| /** |
| * This {@link Converter} class materializes records for a given |
| * {@link GenericData Avro data model}. This replaces |
| * {@link AvroIndexedRecordConverter} and works with generic, specific, and |
| * reflect records. |
| * |
| * @param <T> a subclass of Avro's IndexedRecord |
| */ |
| class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { |
| |
| private static final String STRINGABLE_PROP = "avro.java.string"; |
| private static final String JAVA_CLASS_PROP = "java-class"; |
| private static final String JAVA_KEY_CLASS_PROP = "java-key-class"; |
| |
| protected T currentRecord = null; |
| private ParentValueContainer rootContainer = null; |
| private final Converter[] converters; |
| |
| private final Schema avroSchema; |
| |
| private final GenericData model; |
| private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>(); |
| |
| public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema, |
| GenericData baseModel) { |
| this(null, parquetSchema, avroSchema, baseModel); |
| LogicalType logicalType = avroSchema.getLogicalType(); |
| Conversion<?> conversion = baseModel.getConversionFor(logicalType); |
| this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public void add(Object value) { |
| AvroRecordConverter.this.currentRecord = (T) value; |
| } |
| }, conversion, avroSchema); |
| } |
| |
| public AvroRecordConverter(ParentValueContainer parent, |
| GroupType parquetSchema, Schema avroSchema, |
| GenericData model) { |
| super(parent); |
| this.avroSchema = avroSchema; |
| this.model = (model == null ? ReflectData.get() : model); |
| this.converters = new Converter[parquetSchema.getFieldCount()]; |
| |
| Map<String, Integer> avroFieldIndexes = new HashMap<String, Integer>(); |
| int avroFieldIndex = 0; |
| for (Schema.Field field: avroSchema.getFields()) { |
| avroFieldIndexes.put(field.name(), avroFieldIndex++); |
| } |
| |
| Class<?> recordClass = null; |
| if (model instanceof ReflectData) { |
| recordClass = getDatumClass(avroSchema, model); |
| } |
| |
| Map<String, Class<?>> fields = getFieldsByName(recordClass, false); |
| |
| int parquetFieldIndex = 0; |
| for (Type parquetField: parquetSchema.getFields()) { |
| final Schema.Field avroField = getAvroField(parquetField.getName()); |
| Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema()); |
| final int finalAvroIndex = avroFieldIndexes.remove(avroField.name()); |
| ParentValueContainer container = new ParentValueContainer() { |
| @Override |
| public void add(Object value) { |
| AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value); |
| } |
| }; |
| |
| Class<?> fieldClass = fields.get(avroField.name()); |
| converters[parquetFieldIndex] = newConverter( |
| nonNullSchema, parquetField, this.model, fieldClass, container); |
| |
| // @Stringable doesn't affect the reflected schema; must be enforced here |
| if (recordClass != null && |
| converters[parquetFieldIndex] instanceof FieldStringConverter) { |
| try { |
| Field field = recordClass.getDeclaredField(avroField.name()); |
| if (field.isAnnotationPresent(Stringable.class)) { |
| converters[parquetFieldIndex] = new FieldStringableConverter( |
| container, field.getType()); |
| } |
| } catch (NoSuchFieldException e) { |
| // must not be stringable |
| } |
| } |
| |
| parquetFieldIndex += 1; |
| } |
| |
| // store defaults for any new Avro fields from avroSchema that are not in |
| // the writer schema (parquetSchema) |
| for (String fieldName : avroFieldIndexes.keySet()) { |
| Schema.Field field = avroSchema.getField(fieldName); |
| if (field.schema().getType() == Schema.Type.NULL) { |
| continue; // skip null since Parquet does not write nulls |
| } |
| if (field.defaultVal() == null || this.model.getDefaultValue(field) == null) { |
| continue; // field has no default |
| } |
| // use this.model because model may be null |
| recordDefaults.put(field, this.model.getDefaultValue(field)); |
| } |
| } |
| |
| // this was taken from Avro's ReflectData |
| private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass, |
| boolean excludeJava) { |
| Map<String, Class<?>> fields = new LinkedHashMap<String, Class<?>>(); |
| |
| if (recordClass != null) { |
| Class<?> current = recordClass; |
| do { |
| if (excludeJava && current.getPackage() != null |
| && current.getPackage().getName().startsWith("java.")) { |
| break; // skip java built-in classes |
| } |
| for (Field field : current.getDeclaredFields()) { |
| if (field.isAnnotationPresent(AvroIgnore.class) || |
| isTransientOrStatic(field)) { |
| continue; |
| } |
| AvroName altName = field.getAnnotation(AvroName.class); |
| Class<?> existing = fields.put( |
| altName != null ? altName.value() : field.getName(), |
| field.getType()); |
| if (existing != null) { |
| throw new AvroTypeException( |
| current + " contains two fields named: " + field.getName()); |
| } |
| } |
| current = current.getSuperclass(); |
| } while (current != null); |
| } |
| |
| return fields; |
| } |
| |
| private static boolean isTransientOrStatic(Field field) { |
| return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0; |
| } |
| |
| private Schema.Field getAvroField(String parquetFieldName) { |
| Schema.Field avroField = avroSchema.getField(parquetFieldName); |
| if (avroField != null) { |
| return avroField; |
| } |
| |
| for (Schema.Field f : avroSchema.getFields()) { |
| if (f.aliases().contains(parquetFieldName)) { |
| return f; |
| } |
| } |
| |
| throw new InvalidRecordException(String.format( |
| "Parquet/Avro schema mismatch: Avro field '%s' not found", |
| parquetFieldName)); |
| } |
| |
| private static Converter newConverter( |
| Schema schema, Type type, GenericData model, ParentValueContainer setter) { |
| return newConverter(schema, type, model, null, setter); |
| } |
| |
| private static Converter newConverter(Schema schema, Type type, |
| GenericData model, Class<?> knownClass, ParentValueContainer setter) { |
| LogicalType logicalType = schema.getLogicalType(); |
| Conversion<?> conversion; |
| |
| if (knownClass != null) { |
| conversion = model.getConversionByClass(knownClass, logicalType); |
| } else { |
| conversion = model.getConversionFor(logicalType); |
| } |
| |
| ParentValueContainer parent = ParentValueContainer |
| .getConversionContainer(setter, conversion, schema); |
| |
| switch (schema.getType()) { |
| case BOOLEAN: |
| return new AvroConverters.FieldBooleanConverter(parent); |
| case INT: |
| Class<?> intDatumClass = getDatumClass(conversion, knownClass, schema, model); |
| if (intDatumClass == null) { |
| return new AvroConverters.FieldIntegerConverter(parent); |
| } |
| if (intDatumClass == byte.class || intDatumClass == Byte.class) { |
| return new AvroConverters.FieldByteConverter(parent); |
| } |
| if (intDatumClass == char.class || intDatumClass == Character.class) { |
| return new AvroConverters.FieldCharConverter(parent); |
| } |
| if (intDatumClass == short.class || intDatumClass == Short.class) { |
| return new AvroConverters.FieldShortConverter(parent); |
| } |
| return new AvroConverters.FieldIntegerConverter(parent); |
| case LONG: |
| return new AvroConverters.FieldLongConverter(parent); |
| case FLOAT: |
| return new AvroConverters.FieldFloatConverter(parent); |
| case DOUBLE: |
| return new AvroConverters.FieldDoubleConverter(parent); |
| case BYTES: |
| Class<?> byteDatumClass = getDatumClass(conversion, knownClass, schema, model); |
| if (byteDatumClass == null) { |
| return new AvroConverters.FieldByteBufferConverter(parent); |
| } |
| if (byteDatumClass.isArray() && byteDatumClass.getComponentType() == byte.class) { |
| return new AvroConverters.FieldByteArrayConverter(parent); |
| } |
| return new AvroConverters.FieldByteBufferConverter(parent); |
| case STRING: |
| return newStringConverter(schema, model, parent); |
| case RECORD: |
| return new AvroRecordConverter(parent, type.asGroupType(), schema, model); |
| case ENUM: |
| return new AvroConverters.FieldEnumConverter(parent, schema, model); |
| case ARRAY: |
| Class<?> arrayDatumClass = getDatumClass(conversion, knownClass, schema, model); |
| if (arrayDatumClass != null && arrayDatumClass.isArray()) { |
| return new AvroArrayConverter(parent, type.asGroupType(), schema, model, |
| arrayDatumClass); |
| } |
| return new AvroCollectionConverter(parent, type.asGroupType(), schema, |
| model, arrayDatumClass); |
| case MAP: |
| return new MapConverter(parent, type.asGroupType(), schema, model); |
| case UNION: |
| return new AvroUnionConverter(parent, type, schema, model); |
| case FIXED: |
| return new AvroConverters.FieldFixedConverter(parent, schema, model); |
| default: |
| throw new UnsupportedOperationException(String.format( |
| "Cannot convert Avro type: %s to Parquet type: %s", schema, type)); |
| } |
| } |
| |
| private static Converter newStringConverter(Schema schema, GenericData model, |
| ParentValueContainer parent) { |
| Class<?> stringableClass = getStringableClass(schema, model); |
| if (stringableClass == String.class) { |
| return new FieldStringConverter(parent); |
| } else if (stringableClass == CharSequence.class) { |
| return new AvroConverters.FieldUTF8Converter(parent); |
| } |
| return new FieldStringableConverter(parent, stringableClass); |
| } |
| |
| private static Class<?> getStringableClass(Schema schema, GenericData model) { |
| if (model instanceof SpecificData) { |
| // both specific and reflect (and any subclasses) use this logic |
| boolean isMap = (schema.getType() == Schema.Type.MAP); |
| String stringableClass = schema.getProp( |
| isMap ? JAVA_KEY_CLASS_PROP : JAVA_CLASS_PROP); |
| if (stringableClass != null) { |
| try { |
| return ClassUtils.forName(model.getClassLoader(), stringableClass); |
| } catch (ClassNotFoundException e) { |
| // not available, use a String instead |
| } |
| } |
| } |
| |
| if (ReflectData.class.isAssignableFrom(model.getClass())) { |
| // reflect uses String, not Utf8 |
| return String.class; |
| } |
| |
| // generic and specific use the avro.java.string setting |
| String name = schema.getProp(STRINGABLE_PROP); |
| if (name == null) { |
| return CharSequence.class; |
| } |
| |
| switch (GenericData.StringType.valueOf(name)) { |
| case String: |
| return String.class; |
| default: |
| return CharSequence.class; // will use Utf8 |
| } |
| } |
| |
| private static <T> Class<T> getDatumClass(Schema schema, GenericData model) { |
| return getDatumClass(null, null, schema, model); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T> Class<T> getDatumClass(Conversion<?> conversion, |
| Class<T> knownClass, |
| Schema schema, GenericData model) { |
| if (conversion != null) { |
| // use generic classes to pass data to conversions |
| return null; |
| } |
| |
| // known class can be set when using reflect |
| if (knownClass != null) { |
| return knownClass; |
| } |
| |
| if (model instanceof SpecificData) { |
| // this works for reflect as well |
| return ((SpecificData) model).getClass(schema); |
| |
| } else if (model.getClass() == GenericData.class) { |
| return null; |
| |
| } else { |
| // try to use reflection (for ThriftData and others) |
| Class<? extends GenericData> modelClass = model.getClass(); |
| Method getClassMethod; |
| try { |
| getClassMethod = modelClass.getMethod("getClass", Schema.class); |
| } catch (NoSuchMethodException e) { |
| return null; // no getClass method |
| } |
| |
| try { |
| return (Class<T>) getClassMethod.invoke(schema); |
| } catch (IllegalAccessException | InvocationTargetException e) { |
| return null; |
| } |
| } |
| } |
| |
| protected void set(String name, int avroIndex, Object value) { |
| model.setField(currentRecord, name, avroIndex, value); |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| return converters[fieldIndex]; |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public void start() { |
| this.currentRecord = (T) model.newRecord(null, avroSchema); |
| } |
| |
| @Override |
| public void end() { |
| fillInDefaults(); |
| if (parent != null) { |
| parent.add(currentRecord); |
| } else { |
| // this applies any converters needed for the root value |
| rootContainer.add(currentRecord); |
| } |
| } |
| |
| private void fillInDefaults() { |
| for (Map.Entry<Schema.Field, Object> entry : recordDefaults.entrySet()) { |
| Schema.Field f = entry.getKey(); |
| // replace following with model.deepCopy once AVRO-1455 is being used |
| Object defaultValue = deepCopy(f.schema(), entry.getValue()); |
| set(f.name(), f.pos(), defaultValue); |
| } |
| } |
| |
| private Object deepCopy(Schema schema, Object value) { |
| switch (schema.getType()) { |
| case BOOLEAN: |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| return value; |
| default: |
| return model.deepCopy(schema, value); |
| } |
| } |
| |
| T getCurrentRecord() { |
| return currentRecord; |
| } |
| |
| /** |
| * Converter for a list to a Java Collection. |
| * |
| * <pre> |
| * optional group the_list (LIST) { <-- this layer |
| * repeated group array { |
| * optional (type) element; |
| * } |
| * } |
| * </pre> |
| * |
| * This class also implements LIST element backward-compatibility rules. |
| */ |
| static final class AvroCollectionConverter extends GroupConverter { |
| |
| private final ParentValueContainer parent; |
| private final Schema avroSchema; |
| private final Converter converter; |
| private Class<?> containerClass; |
| private Collection<Object> container; |
| |
| public AvroCollectionConverter(ParentValueContainer parent, GroupType type, |
| Schema avroSchema, GenericData model, |
| Class<?> containerClass) { |
| this.parent = parent; |
| this.avroSchema = avroSchema; |
| this.containerClass = containerClass; |
| Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType()); |
| Type repeatedType = type.getType(0); |
| // always determine whether the repeated type is the element type by |
| // matching it against the element schema. |
| if (isElementType(repeatedType, elementSchema)) { |
| // the element type is the repeated type (and required) |
| converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public void add(Object value) { |
| container.add(value); |
| } |
| }); |
| } else { |
| // the element is wrapped in a synthetic group and may be optional |
| converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model); |
| } |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| return converter; |
| } |
| |
| @Override |
| public void start() { |
| container = newContainer(); |
| } |
| |
| @Override |
| public void end() { |
| parent.add(container); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Collection<Object> newContainer() { |
| if (containerClass == null) { |
| return new GenericData.Array<Object>(0, avroSchema); |
| } else if (containerClass.isAssignableFrom(ArrayList.class)) { |
| return new ArrayList<Object>(); |
| } else { |
| // not need to use the data model to instantiate because it resolved |
| // the class, which used the correct ClassLoader |
| return (Collection<Object>) ReflectData.newInstance(containerClass, avroSchema); |
| } |
| } |
| |
| /** |
| * Converter for list elements. |
| * |
| * <pre> |
| * optional group the_list (LIST) { |
| * repeated group array { <-- this layer |
| * optional (type) element; |
| * } |
| * } |
| * </pre> |
| */ |
| final class ElementConverter extends GroupConverter { |
| private Object element; |
| private final Converter elementConverter; |
| |
| public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) { |
| Type elementType = repeatedType.getType(0); |
| Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); |
| this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public void add(Object value) { |
| ElementConverter.this.element = value; |
| } |
| }); |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| Preconditions.checkArgument( |
| fieldIndex == 0, "Illegal field index: " + fieldIndex); |
| return elementConverter; |
| } |
| |
| @Override |
| public void start() { |
| element = null; |
| } |
| |
| @Override |
| public void end() { |
| container.add(element); |
| } |
| } |
| } |
| |
| /** |
| * Converter for a list to a Java array. |
| * |
| * <pre> |
| * optional group the_list (LIST) { <-- this layer |
| * repeated group array { |
| * optional (type) element; |
| * } |
| * } |
| * </pre> |
| * |
| * This class also implements LIST element backward-compatibility rules. |
| */ |
| static final class AvroArrayConverter extends GroupConverter { |
| |
| private final ParentValueContainer parent; |
| private final Schema avroSchema; |
| private final Converter converter; |
| private Class<?> elementClass; |
| private Collection<?> container; |
| |
| public AvroArrayConverter(ParentValueContainer parent, GroupType type, |
| Schema avroSchema, GenericData model, |
| Class<?> arrayClass) { |
| this.parent = parent; |
| this.avroSchema = avroSchema; |
| |
| Preconditions.checkArgument(arrayClass.isArray(), |
| "Cannot convert non-array: " + arrayClass.getName()); |
| this.elementClass = arrayClass.getComponentType(); |
| |
| ParentValueContainer setter = createSetterAndContainer(); |
| Schema elementSchema = this.avroSchema.getElementType(); |
| Type repeatedType = type.getType(0); |
| |
| // always determine whether the repeated type is the element type by |
| // matching it against the element schema. |
| if (isElementType(repeatedType, elementSchema)) { |
| // the element type is the repeated type (and required) |
| converter = newConverter(elementSchema, repeatedType, model, elementClass, setter); |
| } else { |
| // the element is wrapped in a synthetic group and may be optional |
| converter = new ArrayElementConverter( |
| repeatedType.asGroupType(), elementSchema, model, setter); |
| } |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| return converter; |
| } |
| |
| @Override |
| public void start() { |
| // end creates a new copy of the array so the container is safe to reuse |
| container.clear(); |
| } |
| |
| @Override |
| public void end() { |
| if (elementClass == boolean.class) { |
| parent.add(((BooleanArrayList) container).toBooleanArray()); |
| } else if (elementClass == byte.class) { |
| parent.add(((ByteArrayList) container).toByteArray()); |
| } else if (elementClass == char.class) { |
| parent.add(((CharArrayList) container).toCharArray()); |
| } else if (elementClass == short.class) { |
| parent.add(((ShortArrayList) container).toShortArray()); |
| } else if (elementClass == int.class) { |
| parent.add(((IntArrayList) container).toIntArray()); |
| } else if (elementClass == long.class) { |
| parent.add(((LongArrayList) container).toLongArray()); |
| } else if (elementClass == float.class) { |
| parent.add(((FloatArrayList) container).toFloatArray()); |
| } else if (elementClass == double.class) { |
| parent.add(((DoubleArrayList) container).toDoubleArray()); |
| } else { |
| parent.add(((ArrayList) container).toArray()); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private ParentValueContainer createSetterAndContainer() { |
| if (elementClass == boolean.class) { |
| final BooleanArrayList list = new BooleanArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addBoolean(boolean value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == byte.class) { |
| final ByteArrayList list = new ByteArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addByte(byte value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == char.class) { |
| final CharArrayList list = new CharArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addChar(char value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == short.class) { |
| final ShortArrayList list = new ShortArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addShort(short value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == int.class) { |
| final IntArrayList list = new IntArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addInt(int value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == long.class) { |
| final LongArrayList list = new LongArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addLong(long value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == float.class) { |
| final FloatArrayList list = new FloatArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addFloat(float value) { |
| list.add(value); |
| } |
| }; |
| } else if (elementClass == double.class) { |
| final DoubleArrayList list = new DoubleArrayList(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void addDouble(double value) { |
| list.add(value); |
| } |
| }; |
| } else { |
| // this will end up as Object[] |
| final List<Object> list = new ArrayList<Object>(); |
| this.container = list; |
| return new ParentValueContainer() { |
| @Override |
| public void add(Object value) { |
| list.add(value); |
| } |
| }; |
| } |
| |
| } |
| |
| /** |
| * Converter for primitive list elements. |
| * |
| * <pre> |
| * optional group the_list (LIST) { |
| * repeated group array { <-- this layer |
| * optional (type) element; |
| * } |
| * } |
| * </pre> |
| */ |
| final class ArrayElementConverter extends GroupConverter { |
| private boolean isSet; |
| private final Converter elementConverter; |
| |
| public ArrayElementConverter(GroupType repeatedType, |
| Schema elementSchema, GenericData model, |
| final ParentValueContainer setter) { |
| Type elementType = repeatedType.getType(0); |
| Preconditions.checkArgument( |
| !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED), |
| "Cannot convert list of optional elements to primitive array"); |
| Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); |
| this.elementConverter = newConverter( |
| nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() { |
| @Override |
| public void add(Object value) { |
| isSet = true; |
| setter.add(value); |
| } |
| |
| @Override |
| public void addByte(byte value) { |
| isSet = true; |
| setter.addByte(value); |
| } |
| |
| @Override |
| public void addBoolean(boolean value) { |
| isSet = true; |
| setter.addBoolean(value); |
| } |
| |
| @Override |
| public void addChar(char value) { |
| isSet = true; |
| setter.addChar(value); |
| } |
| |
| @Override |
| public void addShort(short value) { |
| isSet = true; |
| setter.addShort(value); |
| } |
| |
| @Override |
| public void addInt(int value) { |
| isSet = true; |
| setter.addInt(value); |
| } |
| |
| @Override |
| public void addLong(long value) { |
| isSet = true; |
| setter.addLong(value); |
| } |
| |
| @Override |
| public void addFloat(float value) { |
| isSet = true; |
| setter.addFloat(value); |
| } |
| |
| @Override |
| public void addDouble(double value) { |
| isSet = true; |
| setter.addDouble(value); |
| } |
| }); |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| Preconditions.checkArgument( |
| fieldIndex == 0, "Illegal field index: " + fieldIndex); |
| return elementConverter; |
| } |
| |
| @Override |
| public void start() { |
| isSet = false; |
| } |
| |
| @Override |
| public void end() { |
| if (!isSet) { |
| container.add(null); |
| } |
| } |
| } |
| } |
| |
| // Converter used to test whether a requested schema is a 2-level schema. |
| // This is used to convert the file's type assuming that the file uses |
| // 2-level lists and the result is checked to see if it matches the requested |
| // element type. This should always convert assuming 2-level lists because |
| // 2-level and 3-level can't be mixed. |
| private static final AvroSchemaConverter CONVERTER = |
| new AvroSchemaConverter(true); |
| |
| /** |
| * Returns whether the given type is the element type of a list or is a |
| * synthetic group with one field that is the element type. This is |
| * determined by checking whether the type can be a synthetic group and by |
| * checking whether a potential synthetic group matches the expected schema. |
| * <p> |
| * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this |
| * method never guesses because the expected schema is known. |
| * |
| * @param repeatedType a type that may be the element type |
| * @param elementSchema the expected Schema for list elements |
| * @return {@code true} if the repeatedType is the element schema |
| */ |
| static boolean isElementType(Type repeatedType, Schema elementSchema) { |
| if (repeatedType.isPrimitive() || |
| repeatedType.asGroupType().getFieldCount() > 1 || |
| repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) { |
| // The repeated type must be the element type because it is an invalid |
| // synthetic wrapper. Must be a group with one optional or required field |
| return true; |
| } else if (elementSchema != null && |
| elementSchema.getType() == Schema.Type.RECORD) { |
| Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType()); |
| if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated) |
| .getType() == COMPATIBLE) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| static final class AvroUnionConverter extends AvroConverters.AvroGroupConverter { |
| private final Converter[] memberConverters; |
| private Object memberValue = null; |
| |
| public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema, |
| Schema avroSchema, GenericData model) { |
| super(parent); |
| GroupType parquetGroup = parquetSchema.asGroupType(); |
| this.memberConverters = new Converter[ parquetGroup.getFieldCount()]; |
| |
| int parquetIndex = 0; |
| for (int index = 0; index < avroSchema.getTypes().size(); index++) { |
| Schema memberSchema = avroSchema.getTypes().get(index); |
| if (!memberSchema.getType().equals(Schema.Type.NULL)) { |
| Type memberType = parquetGroup.getType(parquetIndex); |
| memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model, new ParentValueContainer() { |
| @Override |
| public void add(Object value) { |
| Preconditions.checkArgument( |
| AvroUnionConverter.this.memberValue == null, |
| "Union is resolving to more than one type"); |
| memberValue = value; |
| } |
| }); |
| parquetIndex++; // Note for nulls the parquetIndex id not increased |
| } |
| } |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| return memberConverters[fieldIndex]; |
| } |
| |
| @Override |
| public void start() { |
| memberValue = null; |
| } |
| |
| @Override |
| public void end() { |
| parent.add(memberValue); |
| } |
| } |
| |
| static final class MapConverter<K, V> extends GroupConverter { |
| |
| private final ParentValueContainer parent; |
| private final Converter keyValueConverter; |
| private final Schema schema; |
| private final Class<?> mapClass; |
| private Map<K, V> map; |
| |
| public MapConverter(ParentValueContainer parent, GroupType mapType, |
| Schema mapSchema, GenericData model) { |
| this.parent = parent; |
| GroupType repeatedKeyValueType = mapType.getType(0).asGroupType(); |
| this.keyValueConverter = new MapKeyValueConverter( |
| repeatedKeyValueType, mapSchema, model); |
| this.schema = mapSchema; |
| this.mapClass = getDatumClass(mapSchema, model); |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| return keyValueConverter; |
| } |
| |
| @Override |
| public void start() { |
| this.map = newMap(); |
| } |
| |
| @Override |
| public void end() { |
| parent.add(map); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Map<K, V> newMap() { |
| if (mapClass == null || mapClass.isAssignableFrom(HashMap.class)) { |
| return new HashMap<K, V>(); |
| } else { |
| return (Map<K, V>) ReflectData.newInstance(mapClass, schema); |
| } |
| } |
| |
| final class MapKeyValueConverter extends GroupConverter { |
| |
| private K key; |
| private V value; |
| private final Converter keyConverter; |
| private final Converter valueConverter; |
| |
| public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, |
| GenericData model) { |
| keyConverter = newStringConverter(mapSchema, model, |
| new ParentValueContainer() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public void add(Object value) { |
| MapKeyValueConverter.this.key = (K) value; |
| } |
| }); |
| |
| Type valueType = keyValueType.getType(1); |
| Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); |
| valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public void add(Object value) { |
| MapKeyValueConverter.this.value = (V) value; |
| } |
| }); |
| } |
| |
| @Override |
| public Converter getConverter(int fieldIndex) { |
| if (fieldIndex == 0) { |
| return keyConverter; |
| } else if (fieldIndex == 1) { |
| return valueConverter; |
| } |
| throw new IllegalArgumentException("only the key (0) and value (1) fields expected: " + fieldIndex); |
| } |
| |
| @Override |
| public void start() { |
| key = null; |
| value = null; |
| } |
| |
| @Override |
| public void end() { |
| map.put(key, value); |
| } |
| } |
| } |
| } |