| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.coders; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.lang.reflect.Field; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.SortedSet; |
| import javax.annotation.Nullable; |
| import org.apache.avro.AvroRuntimeException; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericDatumReader; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.generic.IndexedRecord; |
| import org.apache.avro.io.BinaryDecoder; |
| import org.apache.avro.io.BinaryEncoder; |
| import org.apache.avro.io.DatumReader; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.io.DecoderFactory; |
| import org.apache.avro.io.EncoderFactory; |
| import org.apache.avro.reflect.AvroEncode; |
| import org.apache.avro.reflect.AvroName; |
| import org.apache.avro.reflect.AvroSchema; |
| import org.apache.avro.reflect.ReflectData; |
| import org.apache.avro.reflect.ReflectDatumReader; |
| import org.apache.avro.reflect.ReflectDatumWriter; |
| import org.apache.avro.reflect.Union; |
| import org.apache.avro.specific.SpecificData; |
| import org.apache.avro.util.ClassUtils; |
| import org.apache.avro.util.Utf8; |
| import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; |
| |
| /** |
| * A {@link Coder} using Avro binary format. |
| * |
| * <p>Each instance of {@code AvroCoder<T>} encapsulates an Avro schema for objects of type {@code |
| * T}. |
| * |
| * <p>The Avro schema may be provided explicitly via {@link AvroCoder#of(Class, Schema)} or omitted |
| * via {@link AvroCoder#of(Class)}, in which case it will be inferred using Avro's {@link |
| * org.apache.avro.reflect.ReflectData}. |
| * |
| * <p>For complete details about schema generation and how it can be controlled please see the |
| * {@link org.apache.avro.reflect} package. Only concrete classes with a no-argument constructor can |
| * be mapped to Avro records. All inherited fields that are not static or transient are included. |
| * Fields are not permitted to be null unless annotated by {@link Nullable} or a {@link Union} |
| * schema containing {@code "null"}. |
| * |
| * <p>To use, specify the {@code Coder} type on a PCollection: |
| * |
| * <pre>{@code |
| * PCollection<MyCustomElement> records = |
| * input.apply(...) |
| * .setCoder(AvroCoder.of(MyCustomElement.class)); |
| * }</pre> |
| * |
| * <p>or annotate the element class using {@code @DefaultCoder}. |
| * |
| * <pre>{@code @DefaultCoder(AvroCoder.class) |
| * public class MyCustomElement { |
| * ... |
| * } |
| * }</pre> |
| * |
| * <p>The implementation attempts to determine if the Avro encoding of the given type will satisfy |
| * the criteria of {@link Coder#verifyDeterministic} by inspecting both the type and the Schema |
| * provided or generated by Avro. Only coders that are deterministic can be used in {@link |
| * org.apache.beam.sdk.transforms.GroupByKey} operations. |
| * |
| * @param <T> the type of elements handled by this coder |
| */ |
| public class AvroCoder<T> extends CustomCoder<T> { |
| |
| /** |
| * Returns an {@code AvroCoder} instance for the provided element type. |
| * |
| * @param <T> the element type |
| */ |
| public static <T> AvroCoder<T> of(TypeDescriptor<T> type) { |
| @SuppressWarnings("unchecked") |
| Class<T> clazz = (Class<T>) type.getRawType(); |
| return of(clazz); |
| } |
| |
| /** |
| * Returns an {@code AvroCoder} instance for the provided element class. |
| * |
| * @param <T> the element type |
| */ |
| public static <T> AvroCoder<T> of(Class<T> clazz) { |
| return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz)); |
| } |
| |
| /** |
| * Returns an {@code AvroCoder} instance for the Avro schema. The implicit type is GenericRecord. |
| */ |
| public static AvroCoder<GenericRecord> of(Schema schema) { |
| return new AvroCoder<>(GenericRecord.class, schema); |
| } |
| |
| /** |
| * Returns an {@code AvroCoder} instance for the provided element type using the provided Avro |
| * schema. |
| * |
| * <p>If the type argument is GenericRecord, the schema may be arbitrary. Otherwise, the schema |
| * must correspond to the type provided. |
| * |
| * @param <T> the element type |
| */ |
| public static <T> AvroCoder<T> of(Class<T> type, Schema schema) { |
| return new AvroCoder<>(type, schema); |
| } |
| |
| /** |
| * Returns a {@link CoderProvider} which uses the {@link AvroCoder} if possible for all types. |
| * |
| * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively |
| * accept dangerous types such as {@link Object}. |
| * |
| * <p>This method is invoked reflectively from {@link DefaultCoder}. |
| */ |
| @SuppressWarnings("unused") |
| public static CoderProvider getCoderProvider() { |
| return new AvroCoderProvider(); |
| } |
| |
| /** |
| * A {@link CoderProvider} that constructs an {@link AvroCoder} for Avro compatible classes. |
| * |
| * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively |
| * accept dangerous types such as {@link Object}. |
| */ |
| static class AvroCoderProvider extends CoderProvider { |
| @Override |
| public <T> Coder<T> coderFor( |
| TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders) |
| throws CannotProvideCoderException { |
| try { |
| return AvroCoder.of(typeDescriptor); |
| } catch (AvroRuntimeException e) { |
| throw new CannotProvideCoderException( |
| String.format("%s is not compatible with Avro", typeDescriptor), e); |
| } |
| } |
| } |
| |
| private final Class<T> type; |
| private final SerializableSchemaSupplier schemaSupplier; |
| private final TypeDescriptor<T> typeDescriptor; |
| |
| private final List<String> nonDeterministicReasons; |
| |
| // Factories allocated by .get() are thread-safe and immutable. |
| private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); |
| private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get(); |
| |
| /** |
| * A {@link Serializable} object that holds the {@link String} version of a {@link Schema}. This |
| * is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage of the |
| * {@link #readResolve} method. |
| */ |
| private static class SerializableSchemaString implements Serializable { |
| private final String schema; |
| |
| private SerializableSchemaString(String schema) { |
| this.schema = schema; |
| } |
| |
| private Object readResolve() throws IOException, ClassNotFoundException { |
| return new SerializableSchemaSupplier(new Schema.Parser().parse(schema)); |
| } |
| } |
| |
| /** |
| * A {@link Serializable} object that delegates to the {@link SerializableSchemaString} via {@link |
| * Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize Java's |
| * serialization and hence is able to encode the {@link Schema} object directly. |
| */ |
| private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> { |
| private final Schema schema; |
| |
| private SerializableSchemaSupplier(Schema schema) { |
| this.schema = schema; |
| } |
| |
| private Object writeReplace() { |
| return new SerializableSchemaString(schema.toString()); |
| } |
| |
| @Override |
| public Schema get() { |
| return schema; |
| } |
| } |
| |
| /** |
| * A {@link Serializable} object that lazily supplies a {@link ReflectData} built from the |
| * appropriate {@link ClassLoader} for the type encoded by this {@link AvroCoder}. |
| */ |
| private static class SerializableReflectDataSupplier |
| implements Serializable, Supplier<ReflectData> { |
| |
| private final Class<?> clazz; |
| |
| private SerializableReflectDataSupplier(Class<?> clazz) { |
| this.clazz = clazz; |
| } |
| |
| @Override |
| public ReflectData get() { |
| return new ReflectData(clazz.getClassLoader()); |
| } |
| } |
| |
| // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, |
| // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use |
| // an inner coder. |
| private final EmptyOnDeserializationThreadLocal<BinaryDecoder> decoder; |
| private final EmptyOnDeserializationThreadLocal<BinaryEncoder> encoder; |
| private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer; |
| private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader; |
| |
| // Lazily re-instantiated after deserialization |
| private final Supplier<ReflectData> reflectData; |
| |
| protected AvroCoder(Class<T> type, Schema schema) { |
| this.type = type; |
| this.schemaSupplier = new SerializableSchemaSupplier(schema); |
| typeDescriptor = TypeDescriptor.of(type); |
| nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema); |
| |
| // Decoder and Encoder start off null for each thread. They are allocated and potentially |
| // reused inside encode/decode. |
| this.decoder = new EmptyOnDeserializationThreadLocal<>(); |
| this.encoder = new EmptyOnDeserializationThreadLocal<>(); |
| |
| this.reflectData = Suppliers.memoize(new SerializableReflectDataSupplier(getType())); |
| |
| // Reader and writer are allocated once per thread per Coder |
| this.reader = |
| new EmptyOnDeserializationThreadLocal<DatumReader<T>>() { |
| private final AvroCoder<T> myCoder = AvroCoder.this; |
| |
| @Override |
| public DatumReader<T> initialValue() { |
| return myCoder.getType().equals(GenericRecord.class) |
| ? new GenericDatumReader<>(myCoder.getSchema()) |
| : new ReflectDatumReader<>( |
| myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get()); |
| } |
| }; |
| |
| this.writer = |
| new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() { |
| private final AvroCoder<T> myCoder = AvroCoder.this; |
| |
| @Override |
| public DatumWriter<T> initialValue() { |
| return myCoder.getType().equals(GenericRecord.class) |
| ? new GenericDatumWriter<>(myCoder.getSchema()) |
| : new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get()); |
| } |
| }; |
| } |
| |
| /** Returns the type this coder encodes/decodes. */ |
| public Class<T> getType() { |
| return type; |
| } |
| |
| @Override |
| public void encode(T value, OutputStream outStream) throws IOException { |
| // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. |
| BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); |
| // Save the potentially-new instance for reuse later. |
| encoder.set(encoderInstance); |
| writer.get().write(value, encoderInstance); |
| // Direct binary encoder does not buffer any data and need not be flushed. |
| } |
| |
| @Override |
| public T decode(InputStream inStream) throws IOException { |
| // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. |
| BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); |
| // Save the potentially-new instance for later. |
| decoder.set(decoderInstance); |
| return reader.get().read(null, decoderInstance); |
| } |
| |
| /** |
| * @throws NonDeterministicException when the type may not be deterministically encoded using the |
| * given {@link Schema}, the {@code directBinaryEncoder}, and the {@link ReflectDatumWriter} |
| * or {@link GenericDatumWriter}. |
| */ |
| @Override |
| public void verifyDeterministic() throws NonDeterministicException { |
| if (!nonDeterministicReasons.isEmpty()) { |
| throw new NonDeterministicException(this, nonDeterministicReasons); |
| } |
| } |
| |
| /** Returns the schema used by this coder. */ |
| public Schema getSchema() { |
| return schemaSupplier.get(); |
| } |
| |
| @Override |
| public TypeDescriptor<T> getEncodedTypeDescriptor() { |
| return typeDescriptor; |
| } |
| |
| /** |
| * Helper class encapsulating the various pieces of state maintained by the recursive walk used |
| * for checking if the encoding will be deterministic. |
| */ |
| private static class AvroDeterminismChecker { |
| |
| // Reasons that the original type are not deterministic. This accumulates |
| // the actual output. |
| private List<String> reasons = new ArrayList<>(); |
| |
| // Types that are currently "open". Used to make sure we don't have any |
| // recursive types. Note that we assume that all occurrences of a given type |
| // are equal, rather than tracking pairs of type + schema. |
| private Set<TypeDescriptor<?>> activeTypes = new HashSet<>(); |
| |
| // Similarly to how we record active types, we record the schemas we visit |
| // to make sure we don't encounter recursive fields. |
| private Set<Schema> activeSchemas = new HashSet<>(); |
| |
| /** Report an error in the current context. */ |
| private void reportError(String context, String fmt, Object... args) { |
| String message = String.format(fmt, args); |
| reasons.add(context + ": " + message); |
| } |
| |
| /** |
| * Classes that are serialized by Avro as a String include |
| * |
| * <ul> |
| * <li>Subtypes of CharSequence (including String, Avro's mutable Utf8, etc.) |
| * <li>Several predefined classes (BigDecimal, BigInteger, URI, URL) |
| * <li>Classes annotated with @Stringable (uses their #toString() and a String constructor) |
| * </ul> |
| * |
| * <p>Rather than determine which of these cases are deterministic, we list some classes that |
| * definitely are, and treat any others as non-deterministic. |
| */ |
| private static final Set<Class<?>> DETERMINISTIC_STRINGABLE_CLASSES = new HashSet<>(); |
| |
| static { |
| // CharSequences: |
| DETERMINISTIC_STRINGABLE_CLASSES.add(String.class); |
| DETERMINISTIC_STRINGABLE_CLASSES.add(Utf8.class); |
| |
| // Explicitly Stringable: |
| DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigDecimal.class); |
| DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigInteger.class); |
| DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URI.class); |
| DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URL.class); |
| |
| // Classes annotated with @Stringable: |
| } |
| |
| /** Return true if the given type token is a subtype of *any* of the listed parents. */ |
| private static boolean isSubtypeOf(TypeDescriptor<?> type, Class<?>... parents) { |
| for (Class<?> parent : parents) { |
| if (type.isSubtypeOf(TypeDescriptor.of(parent))) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| protected AvroDeterminismChecker() {} |
| |
| // The entry point for the check. Should not be recursively called. |
| public List<String> check(TypeDescriptor<?> type, Schema schema) { |
| recurse(type.getRawType().getName(), type, schema); |
| return reasons; |
| } |
| |
| // This is the method that should be recursively called. It sets up the path |
| // and visited types correctly. |
| private void recurse(String context, TypeDescriptor<?> type, Schema schema) { |
| if (type.getRawType().isAnnotationPresent(AvroSchema.class)) { |
| reportError(context, "Custom schemas are not supported -- remove @AvroSchema."); |
| return; |
| } |
| |
| if (!activeTypes.add(type)) { |
| reportError(context, "%s appears recursively", type); |
| return; |
| } |
| |
| // If the the record isn't a true class, but rather a GenericRecord, SpecificRecord, etc. |
| // with a specified schema, then we need to make the decision based on the generated |
| // implementations. |
| if (isSubtypeOf(type, IndexedRecord.class)) { |
| checkIndexedRecord(context, schema, null); |
| } else { |
| doCheck(context, type, schema); |
| } |
| |
| activeTypes.remove(type); |
| } |
| |
| private void doCheck(String context, TypeDescriptor<?> type, Schema schema) { |
| switch (schema.getType()) { |
| case ARRAY: |
| checkArray(context, type, schema); |
| break; |
| case ENUM: |
| // Enums should be deterministic, since they depend only on the ordinal. |
| break; |
| case FIXED: |
| // Depending on the implementation of GenericFixed, we don't know how |
| // the given field will be encoded. So, we assume that it isn't |
| // deterministic. |
| reportError(context, "FIXED encodings are not guaranteed to be deterministic"); |
| break; |
| case MAP: |
| checkMap(context, type, schema); |
| break; |
| case RECORD: |
| if (!(type.getType() instanceof Class)) { |
| reportError(context, "Cannot determine type from generic %s due to erasure", type); |
| return; |
| } |
| checkRecord(type, schema); |
| break; |
| case UNION: |
| checkUnion(context, type, schema); |
| break; |
| case STRING: |
| checkString(context, type); |
| break; |
| case BOOLEAN: |
| case BYTES: |
| case DOUBLE: |
| case INT: |
| case FLOAT: |
| case LONG: |
| case NULL: |
| // For types that Avro encodes using one of the above primitives, we assume they are |
| // deterministic. |
| break; |
| default: |
| // In any other case (eg., new types added to Avro) we cautiously return |
| // false. |
| reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType()); |
| break; |
| } |
| } |
| |
| private void checkString(String context, TypeDescriptor<?> type) { |
| // For types that are encoded as strings, we need to make sure they're in an approved |
| // whitelist. For other types that are annotated @Stringable, Avro will just use the |
| // #toString() methods, which has no guarantees of determinism. |
| if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(type.getRawType())) { |
| reportError(context, "%s may not have deterministic #toString()", type); |
| } |
| } |
| |
| private static final Schema AVRO_NULL_SCHEMA = Schema.create(Schema.Type.NULL); |
| |
| private void checkUnion(String context, TypeDescriptor<?> type, Schema schema) { |
| final List<Schema> unionTypes = schema.getTypes(); |
| |
| if (!type.getRawType().isAnnotationPresent(Union.class)) { |
| // First check for @Nullable field, which shows up as a union of field type and null. |
| if (unionTypes.size() == 2 && unionTypes.contains(AVRO_NULL_SCHEMA)) { |
| // Find the Schema that is not NULL and recursively check that it is deterministic. |
| Schema nullableFieldSchema = |
| unionTypes.get(0).equals(AVRO_NULL_SCHEMA) ? unionTypes.get(1) : unionTypes.get(0); |
| doCheck(context, type, nullableFieldSchema); |
| return; |
| } |
| |
| // Otherwise report a schema error. |
| reportError(context, "Expected type %s to have @Union annotation", type); |
| return; |
| } |
| |
| // Errors associated with this union will use the base class as their context. |
| String baseClassContext = type.getRawType().getName(); |
| |
| // For a union, we need to make sure that each possible instantiation is deterministic. |
| for (Schema concrete : unionTypes) { |
| @SuppressWarnings("unchecked") |
| TypeDescriptor<?> unionType = TypeDescriptor.of(ReflectData.get().getClass(concrete)); |
| |
| recurse(baseClassContext, unionType, concrete); |
| } |
| } |
| |
| private void checkRecord(TypeDescriptor<?> type, Schema schema) { |
| // For a record, we want to make sure that all the fields are deterministic. |
| Class<?> clazz = type.getRawType(); |
| for (Schema.Field fieldSchema : schema.getFields()) { |
| Field field = getField(clazz, fieldSchema.name()); |
| String fieldContext = field.getDeclaringClass().getName() + "#" + field.getName(); |
| |
| if (field.isAnnotationPresent(AvroEncode.class)) { |
| reportError( |
| fieldContext, "Custom encoders may be non-deterministic -- remove @AvroEncode"); |
| continue; |
| } |
| |
| if (!IndexedRecord.class.isAssignableFrom(field.getType()) |
| && field.isAnnotationPresent(AvroSchema.class)) { |
| // TODO: We should be able to support custom schemas on POJO fields, but we shouldn't |
| // need to, so we just allow it in the case of IndexedRecords. |
| reportError( |
| fieldContext, "Custom schemas are only supported for subtypes of IndexedRecord."); |
| continue; |
| } |
| |
| TypeDescriptor<?> fieldType = type.resolveType(field.getGenericType()); |
| recurse(fieldContext, fieldType, fieldSchema.schema()); |
| } |
| } |
| |
| private void checkIndexedRecord( |
| String context, Schema schema, @Nullable String specificClassStr) { |
| |
| if (!activeSchemas.add(schema)) { |
| reportError(context, "%s appears recursively", schema.getName()); |
| return; |
| } |
| |
| switch (schema.getType()) { |
| case ARRAY: |
| // Generic Records use GenericData.Array to implement arrays, which is |
| // essentially an ArrayList, and therefore ordering is deterministic. |
| // The array is thus deterministic if the elements are deterministic. |
| checkIndexedRecord(context, schema.getElementType(), null); |
| break; |
| case ENUM: |
| // Enums are deterministic because they encode as a single integer. |
| break; |
| case FIXED: |
| // In the case of GenericRecords, FIXED is deterministic because it |
| // encodes/decodes as a Byte[]. |
| break; |
| case MAP: |
| reportError( |
| context, |
| "GenericRecord and SpecificRecords use a HashMap to represent MAPs," |
| + " so it is non-deterministic"); |
| break; |
| case RECORD: |
| for (Schema.Field field : schema.getFields()) { |
| checkIndexedRecord( |
| schema.getName() + "." + field.name(), |
| field.schema(), |
| field.getProp(SpecificData.CLASS_PROP)); |
| } |
| break; |
| case STRING: |
| // GenericDatumWriter#findStringClass will use a CharSequence or a String |
| // for each string, so it is deterministic. |
| |
| // SpecificCompiler#getStringType will use java.lang.String, org.apache.avro.util.Utf8, |
| // or java.lang.CharSequence, unless SpecificData.CLASS_PROP overrides that. |
| if (specificClassStr != null) { |
| Class<?> specificClass; |
| try { |
| specificClass = ClassUtils.forName(specificClassStr); |
| if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(specificClass)) { |
| reportError( |
| context, |
| "Specific class %s is not known to be deterministic", |
| specificClassStr); |
| } |
| } catch (ClassNotFoundException e) { |
| reportError( |
| context, "Specific class %s is not known to be deterministic", specificClassStr); |
| } |
| } |
| break; |
| case UNION: |
| for (Schema subschema : schema.getTypes()) { |
| checkIndexedRecord(subschema.getName(), subschema, null); |
| } |
| break; |
| case BOOLEAN: |
| case BYTES: |
| case DOUBLE: |
| case INT: |
| case FLOAT: |
| case LONG: |
| case NULL: |
| // For types that Avro encodes using one of the above primitives, we assume they are |
| // deterministic. |
| break; |
| default: |
| reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType()); |
| break; |
| } |
| |
| activeSchemas.remove(schema); |
| } |
| |
| private void checkMap(String context, TypeDescriptor<?> type, Schema schema) { |
| if (!isSubtypeOf(type, SortedMap.class)) { |
| reportError(context, "%s may not be deterministically ordered", type); |
| } |
| |
| // Avro (currently) asserts that all keys are strings. |
| // In case that changes, we double check that the key was a string: |
| Class<?> keyType = type.resolveType(Map.class.getTypeParameters()[0]).getRawType(); |
| if (!String.class.equals(keyType)) { |
| reportError(context, "map keys should be Strings, but was %s", keyType); |
| } |
| |
| recurse(context, type.resolveType(Map.class.getTypeParameters()[1]), schema.getValueType()); |
| } |
| |
| private void checkArray(String context, TypeDescriptor<?> type, Schema schema) { |
| TypeDescriptor<?> elementType = null; |
| if (type.isArray()) { |
| // The type is an array (with ordering)-> deterministic iff the element is deterministic. |
| elementType = type.getComponentType(); |
| } else if (isSubtypeOf(type, Collection.class)) { |
| if (isSubtypeOf(type, List.class, SortedSet.class)) { |
| // Ordered collection -> deterministic iff the element is deterministic |
| elementType = type.resolveType(Collection.class.getTypeParameters()[0]); |
| } else { |
| // Not an ordered collection -> not deterministic |
| reportError(context, "%s may not be deterministically ordered", type); |
| return; |
| } |
| } else { |
| // If it was an unknown type encoded as an array, be conservative and assume |
| // that we don't know anything about the order. |
| reportError(context, "encoding %s as an ARRAY was unexpected", type); |
| return; |
| } |
| |
| // If we get here, it's either a deterministically-ordered Collection, or |
| // an array. Either way, the type is deterministic iff the element type is |
| // deterministic. |
| recurse(context, elementType, schema.getElementType()); |
| } |
| |
| /** |
| * Extract a field from a class. We need to look at the declared fields so that we can see |
| * private fields. We may need to walk up to the parent to get classes from the parent. |
| */ |
| private static Field getField(Class<?> originalClazz, String name) { |
| Class<?> clazz = originalClazz; |
| while (clazz != null) { |
| for (Field field : clazz.getDeclaredFields()) { |
| AvroName avroName = field.getAnnotation(AvroName.class); |
| if (avroName != null && name.equals(avroName.value())) { |
| return field; |
| } else if (avroName == null && name.equals(field.getName())) { |
| return field; |
| } |
| } |
| clazz = clazz.getSuperclass(); |
| } |
| |
| throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (other == this) { |
| return true; |
| } |
| if (!(other instanceof AvroCoder)) { |
| return false; |
| } |
| AvroCoder<?> that = (AvroCoder<?>) other; |
| return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get()) |
| && Objects.equals(this.typeDescriptor, that.typeDescriptor); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(schemaSupplier.get(), typeDescriptor); |
| } |
| } |