blob: 1e218ffc7bf0b6bb8e81dd06b62af0e69bd2a6d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.coders;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.ReadableFileCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MetadataCoder;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSetMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link CoderRegistry} allows creating a {@link Coder} for a given Java {@link Class class} or
* {@link TypeDescriptor type descriptor}.
*
* <p>Creation of the {@link Coder} is delegated to one of the many registered {@link CoderProvider
* coder providers} based upon the registration order.
*
* <p>By default, the {@link CoderProvider coder provider} precedence order is as follows:
*
* <ul>
* <li>Coder providers registered programmatically with {@link
* CoderRegistry#registerCoderProvider(CoderProvider)}.
* <li>A default coder provider for common Java (Byte, Double, List, ...) and Apache Beam (KV,
* ...) types.
* <li>Coder providers registered automatically through a {@link CoderProviderRegistrar} using a
* {@link ServiceLoader}. Note that the {@link ServiceLoader} registration order is consistent
* but may change due to the addition or removal of libraries exposed to the application. This
* can impact the coder returned if multiple coder providers are capable of supplying a coder
* for the specified type.
* </ul>
*
* <p>Note that if multiple {@link CoderProvider coder providers} can provide a {@link Coder} for a
* given type, the precedence order above defines which {@link CoderProvider} is chosen.
*/
public class CoderRegistry {
private static final Logger LOG = LoggerFactory.getLogger(CoderRegistry.class);
private static final List<CoderProvider> REGISTERED_CODER_FACTORIES;
/** A {@link CoderProvider} for common Java SDK and Apache Beam SDK types. */
private static class CommonTypes extends CoderProvider {
private final Map<Class<?>, CoderProvider> commonTypesToCoderProviders;
private CommonTypes() {
ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
builder.put(
Boolean.class, CoderProviders.fromStaticMethods(Boolean.class, BooleanCoder.class));
builder.put(Byte.class, CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
builder.put(BitSet.class, CoderProviders.fromStaticMethods(BitSet.class, BitSetCoder.class));
builder.put(Float.class, CoderProviders.fromStaticMethods(Float.class, FloatCoder.class));
builder.put(Double.class, CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class));
builder.put(
Instant.class, CoderProviders.fromStaticMethods(Instant.class, InstantCoder.class));
builder.put(
Integer.class, CoderProviders.fromStaticMethods(Integer.class, VarIntCoder.class));
builder.put(
Iterable.class, CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class));
builder.put(KV.class, CoderProviders.fromStaticMethods(KV.class, KvCoder.class));
builder.put(List.class, CoderProviders.fromStaticMethods(List.class, ListCoder.class));
builder.put(Long.class, CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
builder.put(Map.class, CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
builder.put(
Metadata.class, CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class));
builder.put(
ResourceId.class,
CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class));
builder.put(
FileIO.ReadableFile.class,
CoderProviders.fromStaticMethods(FileIO.ReadableFile.class, ReadableFileCoder.class));
builder.put(Set.class, CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
builder.put(
String.class, CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class));
builder.put(
TimestampedValue.class,
CoderProviders.fromStaticMethods(
TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class));
builder.put(Void.class, CoderProviders.fromStaticMethods(Void.class, VoidCoder.class));
builder.put(
byte[].class, CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class));
builder.put(
IntervalWindow.class,
CoderProviders.forCoder(
TypeDescriptor.of(IntervalWindow.class), IntervalWindow.getCoder()));
commonTypesToCoderProviders = builder.build();
}
@Override
public <T> Coder<T> coderFor(
TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
throws CannotProvideCoderException {
CoderProvider factory = commonTypesToCoderProviders.get(typeDescriptor.getRawType());
if (factory == null) {
throw new CannotProvideCoderException(
String.format("%s is not one of the common types.", typeDescriptor));
}
return factory.coderFor(typeDescriptor, componentCoders);
}
}
static {
// Register the standard coders first so they are chosen over ServiceLoader ones
List<CoderProvider> codersToRegister = new ArrayList<>();
codersToRegister.add(new CommonTypes());
// Enumerate all the CoderRegistrars in a deterministic order, adding all coders to register
Set<CoderProviderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(CoderProviderRegistrar.class, ReflectHelpers.findClassLoader())));
// DefaultCoder should have the highest precedence and SerializableCoder the lowest
codersToRegister.addAll(new DefaultCoder.DefaultCoderProviderRegistrar().getCoderProviders());
for (CoderProviderRegistrar registrar : registrars) {
codersToRegister.addAll(registrar.getCoderProviders());
}
codersToRegister.add(SerializableCoder.getCoderProvider());
REGISTERED_CODER_FACTORIES = ImmutableList.copyOf(codersToRegister);
}
/**
* Creates a CoderRegistry containing registrations for all standard coders part of the core Java
* Apache Beam SDK and also any registrations provided by {@link CoderProviderRegistrar coder
* registrars}.
*
* <p>Multiple registrations which can produce a coder for a given type result in a Coder created
* by the (in order of precedence):
*
* <ul>
* <li>{@link CoderProvider coder providers} registered programmatically through {@link
* CoderRegistry#registerCoderProvider}.
* <li>{@link CoderProvider coder providers} for core types found within the Apache Beam Java
* SDK being used.
* <li>The {@link CoderProvider coder providers} from the {@link CoderProviderRegistrar} with
* the lexicographically smallest {@link Class#getName() class name} being used.
* </ul>
*/
public static CoderRegistry createDefault() {
return new CoderRegistry();
}
private CoderRegistry() {
coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES);
}
/**
* Registers {@code coderProvider} as a potential {@link CoderProvider} which can produce {@code
* Coder} instances.
*
* <p>This method prioritizes this {@link CoderProvider} over all prior registered coders.
*
* <p>See {@link CoderProviders} for common {@link CoderProvider} patterns.
*/
public void registerCoderProvider(CoderProvider coderProvider) {
coderProviders.addFirst(coderProvider);
}
/**
* Registers the provided {@link Coder} for the given class.
*
* <p>Note that this is equivalent to {@code registerCoderForType(TypeDescriptor.of(clazz))}. See
* {@link #registerCoderForType(TypeDescriptor, Coder)} for further details.
*/
public void registerCoderForClass(Class<?> clazz, Coder<?> coder) {
registerCoderForType(TypeDescriptor.of(clazz), coder);
}
/**
* Registers the provided {@link Coder} for the given type.
*
* <p>Note that this is equivalent to {@code registerCoderProvider(CoderProviders.forCoder(type,
* coder))}. See {@link #registerCoderProvider} and {@link CoderProviders#forCoder} for further
* details.
*/
public void registerCoderForType(TypeDescriptor<?> type, Coder<?> coder) {
registerCoderProvider(CoderProviders.forCoder(type, coder));
}
/**
* Returns the {@link Coder} to use for values of the given class.
*
* @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
public <T> Coder<T> getCoder(Class<T> clazz) throws CannotProvideCoderException {
return getCoder(TypeDescriptor.of(clazz));
}
/**
* Returns the {@link Coder} to use for values of the given type.
*
* @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
return getCoderFromTypeDescriptor(type, HashMultimap.create());
}
/**
* Returns the {@link Coder} for values of the given type, where the given input type uses the
* given {@link Coder}.
*
* @throws CannotProvideCoderException if a {@link Coder} cannot be provided
* @deprecated This method is to change in an unknown backwards incompatible way once support for
* this functionality is refined.
*/
@Deprecated
@Internal
public <InputT, OutputT> Coder<OutputT> getCoder(
TypeDescriptor<OutputT> typeDescriptor,
TypeDescriptor<InputT> inputTypeDescriptor,
Coder<InputT> inputCoder)
throws CannotProvideCoderException {
checkArgument(typeDescriptor != null);
checkArgument(inputTypeDescriptor != null);
checkArgument(inputCoder != null);
return getCoderFromTypeDescriptor(
typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder));
}
/**
* Returns the {@link Coder} to use on elements produced by this function, given the {@link Coder}
* used for its input elements.
*
* @throws CannotProvideCoderException if a {@link Coder} cannot be provided
* @deprecated This method is to change in an unknown backwards incompatible way once support for
* this functionality is refined.
*/
@Deprecated
@Internal
public <InputT, OutputT> Coder<OutputT> getOutputCoder(
SerializableFunction<InputT, OutputT> fn, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
ParameterizedType fnType =
(ParameterizedType)
TypeDescriptor.of(fn.getClass()).getSupertype(SerializableFunction.class).getType();
return getCoder(
fn.getClass(),
SerializableFunction.class,
ImmutableMap.of(fnType.getActualTypeArguments()[0], inputCoder),
SerializableFunction.class.getTypeParameters()[1]);
}
/**
* Returns the {@link Coder} to use for the specified type parameter specialization of the
* subclass, given {@link Coder Coders} to use for all other type parameters (if any).
*
* @throws CannotProvideCoderException if a {@link Coder} cannot be provided
* @deprecated This method is to change in an unknown backwards incompatible way once support for
* this functionality is refined.
*/
@Deprecated
@Internal
public <T, OutputT> Coder<OutputT> getCoder(
Class<? extends T> subClass,
Class<T> baseClass,
Map<Type, ? extends Coder<?>> knownCoders,
TypeVariable<?> param)
throws CannotProvideCoderException {
Map<Type, Coder<?>> inferredCoders = getDefaultCoders(subClass, baseClass, knownCoders);
@SuppressWarnings("unchecked")
Coder<OutputT> paramCoderOrNull = (Coder<OutputT>) inferredCoders.get(param);
if (paramCoderOrNull != null) {
return paramCoderOrNull;
} else {
throw new CannotProvideCoderException("Cannot infer coder for type parameter " + param);
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Returns a {@code Map} from each of {@code baseClass}'s type parameters to the {@link Coder} to
* use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
*
* <p>If no {@link Coder} can be inferred for a particular type parameter, then that type variable
* will be absent from the returned {@code Map}.
*
* <p>For example, if {@code baseClass} is {@code Map.class}, where {@code Map<K, V>} has type
* parameters {@code K} and {@code V}, and {@code subClass} extends {@code Map<String, Integer>}
* then the result will map the type variable {@code K} to a {@code Coder<String>} and the type
* variable {@code V} to a {@code Coder<Integer>}.
*
* <p>The {@code knownCoders} parameter can be used to provide known {@link Coder Coders} for any
* of the parameters; these will be used to infer the others.
*
* <p>Note that inference is attempted for every type variable. For a type {@code MyType<One, Two,
* Three>} inference will be attempted for all of {@code One}, {@code Two}, {@code Three}, even if
* the requester only wants a {@link Coder} for {@code Two}.
*
* <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a {@link
* Coder} for a particular type variable cannot be inferred, but merely omits the entry from the
* returned {@code Map}. It is the responsibility of the caller (usually {@link
* #getCoderFromTypeDescriptor} to extract the desired coder or throw a {@link
* CannotProvideCoderException} when appropriate.
*
* @param subClass the concrete type whose specializations are being inferred
* @param baseClass the base type, a parameterized class
* @param knownCoders a map corresponding to the set of known {@link Coder Coders} indexed by
* parameter name
*/
private <T> Map<Type, Coder<?>> getDefaultCoders(
Class<? extends T> subClass, Class<T> baseClass, Map<Type, ? extends Coder<?>> knownCoders) {
TypeVariable<Class<T>>[] typeParams = baseClass.getTypeParameters();
Coder<?>[] knownCodersArray = new Coder<?>[typeParams.length];
for (int i = 0; i < typeParams.length; i++) {
knownCodersArray[i] = knownCoders.get(typeParams[i]);
}
Coder<?>[] resultArray = getDefaultCoders(subClass, baseClass, knownCodersArray);
Map<Type, Coder<?>> result = new HashMap<>();
for (int i = 0; i < typeParams.length; i++) {
if (resultArray[i] != null) {
result.put(typeParams[i], resultArray[i]);
}
}
return result;
}
/**
* Returns an array listing, for each of {@code baseClass}'s type parameters, the {@link Coder} to
* use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
*
* <p>If a {@link Coder} cannot be inferred for a type variable, its slot in the resulting array
* will be {@code null}.
*
* <p>For example, if {@code baseClass} is {@code Map.class}, where {@code Map<K, V>} has type
* parameters {@code K} and {@code V} in that order, and {@code subClass} extends {@code
* Map<String, Integer>} then the result will contain a {@code Coder<String>} and a {@code
* Coder<Integer>}, in that order.
*
* <p>The {@code knownCoders} parameter can be used to provide known {@link Coder Coders} for any
* of the type parameters. These will be used to infer the others. If non-null, the length of this
* array must match the number of type parameters of {@code baseClass}, and simply be filled with
* {@code null} values for each type parameters without a known {@link Coder}.
*
* <p>Note that inference is attempted for every type variable. For a type {@code MyType<One, Two,
* Three>} inference will will be attempted for all of {@code One}, {@code Two}, {@code Three},
* even if the requester only wants a {@link Coder} for {@code Two}.
*
* <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a {@link
* Coder} for a particular type variable cannot be inferred. Instead, it results in a {@code null}
* in the array. It is the responsibility of the caller (usually {@link
* #getCoderFromTypeDescriptor} to extract the desired coder or throw a {@link
* CannotProvideCoderException} when appropriate.
*
* @param subClass the concrete type whose specializations are being inferred
* @param baseClass the base type, a parameterized class
* @param knownCoders an array corresponding to the set of base class type parameters. Each entry
* can be either a {@link Coder} (in which case it will be used for inference) or {@code null}
* (in which case it will be inferred). May be {@code null} to indicate the entire set of
* parameters should be inferred.
* @throws IllegalArgumentException if baseClass doesn't have type parameters or if the length of
* {@code knownCoders} is not equal to the number of type parameters of {@code baseClass}.
*/
private <T> Coder<?>[] getDefaultCoders(
Class<? extends T> subClass, Class<T> baseClass, @Nullable Coder<?>[] knownCoders) {
Type type = TypeDescriptor.of(subClass).getSupertype(baseClass).getType();
if (!(type instanceof ParameterizedType)) {
throw new IllegalArgumentException(type + " is not a ParameterizedType");
}
ParameterizedType parameterizedType = (ParameterizedType) type;
Type[] typeArgs = parameterizedType.getActualTypeArguments();
if (knownCoders == null) {
knownCoders = new Coder<?>[typeArgs.length];
} else if (typeArgs.length != knownCoders.length) {
throw new IllegalArgumentException(
String.format(
"Class %s has %d parameters, but %d coders are requested.",
baseClass.getCanonicalName(), typeArgs.length, knownCoders.length));
}
SetMultimap<Type, Coder<?>> context = HashMultimap.create();
for (int i = 0; i < knownCoders.length; i++) {
if (knownCoders[i] != null) {
try {
verifyCompatible(knownCoders[i], typeArgs[i]);
} catch (IncompatibleCoderException exn) {
throw new IllegalArgumentException(
String.format(
"Provided coders for type arguments of %s contain incompatibilities:"
+ " Cannot encode elements of type %s with coder %s",
baseClass, typeArgs[i], knownCoders[i]),
exn);
}
context.putAll(getTypeToCoderBindings(typeArgs[i], knownCoders[i]));
}
}
Coder<?>[] result = new Coder<?>[typeArgs.length];
for (int i = 0; i < knownCoders.length; i++) {
if (knownCoders[i] != null) {
result[i] = knownCoders[i];
} else {
try {
result[i] = getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgs[i]), context);
} catch (CannotProvideCoderException exc) {
result[i] = null;
}
}
}
return result;
}
/**
* Thrown when a {@link Coder} cannot possibly encode a type, yet has been proposed as a {@link
* Coder} for that type.
*/
@VisibleForTesting
static class IncompatibleCoderException extends RuntimeException {
private Coder<?> coder;
private Type type;
IncompatibleCoderException(String message, Coder<?> coder, Type type) {
super(message);
this.coder = coder;
this.type = type;
}
IncompatibleCoderException(String message, Coder<?> coder, Type type, Throwable cause) {
super(message, cause);
this.coder = coder;
this.type = type;
}
public Coder<?> getCoder() {
return coder;
}
public Type getType() {
return type;
}
}
/**
* Returns {@code true} if the given {@link Coder} can possibly encode elements of the given type.
*/
@VisibleForTesting
static <T, CoderT extends Coder<T>, CandidateT> void verifyCompatible(
CoderT coder, Type candidateType) throws IncompatibleCoderException {
// Various representations of the coder's class
@SuppressWarnings("unchecked")
Class<CoderT> coderClass = (Class<CoderT>) coder.getClass();
TypeDescriptor<CoderT> coderDescriptor = TypeDescriptor.of(coderClass);
// Various representations of the actual coded type
@SuppressWarnings("unchecked")
TypeDescriptor<T> codedDescriptor = CoderUtils.getCodedType(coderDescriptor);
@SuppressWarnings("unchecked")
Class<T> codedClass = (Class<T>) codedDescriptor.getRawType();
Type codedType = codedDescriptor.getType();
// Various representations of the candidate type
@SuppressWarnings("unchecked")
TypeDescriptor<CandidateT> candidateDescriptor =
(TypeDescriptor<CandidateT>) TypeDescriptor.of(candidateType);
@SuppressWarnings("unchecked")
Class<CandidateT> candidateClass = (Class<CandidateT>) candidateDescriptor.getRawType();
// If coder has type Coder<T> where the actual value of T is lost
// to erasure, then we cannot rule it out.
if (candidateType instanceof TypeVariable) {
return;
}
// If the raw types are not compatible, we can certainly rule out
// coder compatibility
if (!codedClass.isAssignableFrom(candidateClass)) {
throw new IncompatibleCoderException(
String.format(
"Cannot encode elements of type %s with coder %s because the"
+ " coded type %s is not assignable from %s",
candidateType, coder, codedClass, candidateType),
coder,
candidateType);
}
// we have established that this is a covariant upcast... though
// coders are invariant, we are just checking one direction
@SuppressWarnings("unchecked")
TypeDescriptor<T> candidateOkDescriptor = (TypeDescriptor<T>) candidateDescriptor;
// If the coded type is a parameterized type where any of the actual
// type parameters are not compatible, then the whole thing is certainly not
// compatible.
if ((codedType instanceof ParameterizedType) && !isNullOrEmpty(coder.getCoderArguments())) {
ParameterizedType parameterizedSupertype =
(ParameterizedType) candidateOkDescriptor.getSupertype(codedClass).getType();
Type[] typeArguments = parameterizedSupertype.getActualTypeArguments();
List<? extends Coder<?>> typeArgumentCoders = coder.getCoderArguments();
if (typeArguments.length < typeArgumentCoders.size()) {
throw new IncompatibleCoderException(
String.format(
"Cannot encode elements of type %s with coder %s:"
+ " the generic supertype %s has %s type parameters, which is less than the"
+ " number of coder arguments %s has (%s).",
candidateOkDescriptor,
coder,
parameterizedSupertype,
typeArguments.length,
coder,
typeArgumentCoders.size()),
coder,
candidateOkDescriptor.getType());
}
for (int i = 0; i < typeArgumentCoders.size(); i++) {
try {
Coder<?> typeArgumentCoder = typeArgumentCoders.get(i);
verifyCompatible(
typeArgumentCoder, candidateDescriptor.resolveType(typeArguments[i]).getType());
} catch (IncompatibleCoderException exn) {
throw new IncompatibleCoderException(
String.format(
"Cannot encode elements of type %s with coder %s"
+ " because some component coder is incompatible",
candidateType, coder),
coder,
candidateType,
exn);
}
}
}
}
private static boolean isNullOrEmpty(Collection<?> c) {
return c == null || c.isEmpty();
}
/** The list of {@link CoderProvider coder providers} to use to provide Coders. */
private ArrayDeque<CoderProvider> coderProviders;
/**
* Returns a {@link Coder} to use for values of the given type, in a context where the given types
* use the given coders.
*
* @throws CannotProvideCoderException if a coder cannot be provided
*/
private <T> Coder<T> getCoderFromTypeDescriptor(
TypeDescriptor<T> typeDescriptor, SetMultimap<Type, Coder<?>> typeCoderBindings)
throws CannotProvideCoderException {
Type type = typeDescriptor.getType();
Coder<?> coder;
if (typeCoderBindings.containsKey(type)) {
Set<Coder<?>> coders = typeCoderBindings.get(type);
if (coders.size() == 1) {
coder = Iterables.getOnlyElement(coders);
} else {
throw new CannotProvideCoderException(
String.format(
"Cannot provide a coder for type variable %s"
+ " because the actual type is over specified by multiple"
+ " incompatible coders %s.",
type, coders),
ReasonCode.OVER_SPECIFIED);
}
} else if (type instanceof Class<?>) {
coder = getCoderFromFactories(typeDescriptor, Collections.emptyList());
} else if (type instanceof ParameterizedType) {
coder = getCoderFromParameterizedType((ParameterizedType) type, typeCoderBindings);
} else if (type instanceof TypeVariable) {
coder = getCoderFromFactories(typeDescriptor, Collections.emptyList());
} else if (type instanceof WildcardType) {
// No coder for an unknown generic type.
throw new CannotProvideCoderException(
String.format("Cannot provide a coder for wildcard type %s.", type), ReasonCode.UNKNOWN);
} else {
throw new RuntimeException("Internal error: unexpected kind of Type: " + type);
}
LOG.debug("Coder for {}: {}", typeDescriptor, coder);
@SuppressWarnings("unchecked")
Coder<T> result = (Coder<T>) coder;
return result;
}
/**
* Returns a {@link Coder} to use for values of the given parameterized type, in a context where
* the given types use the given {@link Coder Coders}.
*
* @throws CannotProvideCoderException if no coder can be provided
*/
private Coder<?> getCoderFromParameterizedType(
ParameterizedType type, SetMultimap<Type, Coder<?>> typeCoderBindings)
throws CannotProvideCoderException {
List<Coder<?>> typeArgumentCoders = new ArrayList<>();
for (Type typeArgument : type.getActualTypeArguments()) {
try {
Coder<?> typeArgumentCoder =
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
typeArgumentCoders.add(typeArgumentCoder);
} catch (CannotProvideCoderException exc) {
throw new CannotProvideCoderException(
String.format(
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
exc);
}
}
return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
}
/**
* Attempts to create a {@link Coder} from any registered {@link CoderProvider} returning the
* first successfully created instance.
*/
private Coder<?> getCoderFromFactories(
TypeDescriptor<?> typeDescriptor, List<Coder<?>> typeArgumentCoders)
throws CannotProvideCoderException {
List<CannotProvideCoderException> suppressedExceptions = new ArrayList<>();
for (CoderProvider coderProvider : coderProviders) {
try {
return coderProvider.coderFor(typeDescriptor, typeArgumentCoders);
} catch (CannotProvideCoderException e) {
// Add all failures as suppressed exceptions.
suppressedExceptions.add(e);
}
}
// Build up the error message and list of causes.
StringBuilder messageBuilder =
new StringBuilder()
.append("Unable to provide a Coder for ")
.append(typeDescriptor)
.append(".\n")
.append(" Building a Coder using a registered CoderProvider failed.\n")
.append(" See suppressed exceptions for detailed failures.");
CannotProvideCoderException exceptionOnFailure =
new CannotProvideCoderException(messageBuilder.toString());
for (CannotProvideCoderException suppressedException : suppressedExceptions) {
exceptionOnFailure.addSuppressed(suppressedException);
}
throw exceptionOnFailure;
}
/**
* Returns an immutable {@code SetMultimap} from each of the type variables embedded in the given
* type to the corresponding types in the given {@link Coder}.
*/
private SetMultimap<Type, Coder<?>> getTypeToCoderBindings(Type type, Coder<?> coder) {
checkArgument(type != null);
checkArgument(coder != null);
if (type instanceof TypeVariable || type instanceof Class) {
return ImmutableSetMultimap.of(type, coder);
} else if (type instanceof ParameterizedType) {
return getTypeToCoderBindings((ParameterizedType) type, coder);
} else {
return ImmutableSetMultimap.of();
}
}
/**
* Returns an immutable {@code SetMultimap} from the type arguments of the parameterized type to
* their corresponding {@link Coder Coders}, and so on recursively for their type parameters.
*
* <p>This method is simply a specialization to break out the most elaborate case of {@link
* #getTypeToCoderBindings(Type, Coder)}.
*/
private SetMultimap<Type, Coder<?>> getTypeToCoderBindings(
ParameterizedType type, Coder<?> coder) {
List<Type> typeArguments = Arrays.asList(type.getActualTypeArguments());
List<? extends Coder<?>> coderArguments = coder.getCoderArguments();
if ((coderArguments == null) || (typeArguments.size() != coderArguments.size())) {
return ImmutableSetMultimap.of();
} else {
SetMultimap<Type, Coder<?>> typeToCoder = HashMultimap.create();
typeToCoder.put(type, coder);
for (int i = 0; i < typeArguments.size(); i++) {
Type typeArgument = typeArguments.get(i);
Coder<?> coderArgument = coderArguments.get(i);
if (coderArgument != null) {
typeToCoder.putAll(getTypeToCoderBindings(typeArgument, coderArgument));
}
}
return ImmutableSetMultimap.<Type, Coder<?>>builder().putAll(typeToCoder).build();
}
}
}