| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.schemas; |
| |
| import java.util.ArrayDeque; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.function.Function; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.annotations.Experimental.Kind; |
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.util.common.ReflectHelpers; |
| import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; |
| |
| /** |
| * A {@link SchemaRegistry} allows registering {@link Schema}s for a given Java {@link Class} or a |
| * {@link TypeDescriptor}. |
| * |
| * <p>Types registered in a pipeline's schema registry will automatically be discovered by any |
| * {@link org.apache.beam.sdk.values.PCollection} that uses {@link SchemaCoder}. This allows users |
| * to write pipelines in terms of their own Java types, yet still register schemas for these types. |
| * |
| * <p>TODO: Provide support for schemas registered via a ServiceLoader interface. This will allow |
| * optional modules to register schemas as well. |
| */ |
| @Experimental(Kind.SCHEMAS) |
| public class SchemaRegistry { |
| private static final List<SchemaProvider> REGISTERED_SCHEMA_PROVIDERS; |
| |
| private static class SchemaEntry<T> { |
| private final Schema schema; |
| private final SerializableFunction<T, Row> toRow; |
| private final SerializableFunction<Row, T> fromRow; |
| |
| SchemaEntry( |
| Schema schema, SerializableFunction<T, Row> toRow, SerializableFunction<Row, T> fromRow) { |
| this.schema = schema; |
| this.toRow = toRow; |
| this.fromRow = fromRow; |
| } |
| } |
| |
| private final Map<TypeDescriptor, SchemaEntry> entries = Maps.newHashMap(); |
| private final ArrayDeque<SchemaProvider> providers; |
| |
| private static class PerTypeRegisteredProvider implements SchemaProvider { |
| private final Map<TypeDescriptor, SchemaProvider> providers = Maps.newHashMap(); |
| |
| void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvider) { |
| providers.put(typeDescriptor, schemaProvider); |
| } |
| |
| @Nullable |
| @Override |
| public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { |
| TypeDescriptor<?> type = typeDescriptor; |
| do { |
| SchemaProvider schemaProvider = providers.get(type); |
| if (schemaProvider != null) { |
| return schemaProvider.schemaFor(type); |
| } |
| Class<?> superClass = type.getRawType().getSuperclass(); |
| if (superClass == null || superClass.equals(Object.class)) { |
| return null; |
| } |
| type = TypeDescriptor.of(superClass); |
| } while (true); |
| } |
| |
| @Nullable |
| @Override |
| public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) { |
| TypeDescriptor<?> type = typeDescriptor; |
| do { |
| SchemaProvider schemaProvider = providers.get(type); |
| if (schemaProvider != null) { |
| return (SerializableFunction<T, Row>) schemaProvider.toRowFunction(type); |
| } |
| Class<?> superClass = type.getRawType().getSuperclass(); |
| if (superClass == null || superClass.equals(Object.class)) { |
| return null; |
| } |
| type = TypeDescriptor.of(superClass); |
| } while (true); |
| } |
| |
| @Nullable |
| @Override |
| public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) { |
| TypeDescriptor<?> type = typeDescriptor; |
| do { |
| SchemaProvider schemaProvider = providers.get(type); |
| if (schemaProvider != null) { |
| return (SerializableFunction<Row, T>) schemaProvider.fromRowFunction(type); |
| } |
| Class<?> superClass = type.getRawType().getSuperclass(); |
| if (superClass == null || superClass.equals(Object.class)) { |
| return null; |
| } |
| type = TypeDescriptor.of(superClass); |
| } while (true); |
| } |
| } |
| |
| private final PerTypeRegisteredProvider perTypeRegisteredProviders = |
| new PerTypeRegisteredProvider(); |
| |
| private SchemaRegistry() { |
| providers = new ArrayDeque<>(REGISTERED_SCHEMA_PROVIDERS); |
| providers.addFirst(perTypeRegisteredProviders); |
| } |
| |
| public static SchemaRegistry createDefault() { |
| return new SchemaRegistry(); |
| } |
| |
| /** Register a schema for a specific {@link Class} type. */ |
| public <T> void registerSchemaForClass( |
| Class<T> clazz, |
| Schema schema, |
| SerializableFunction<T, Row> toRow, |
| SerializableFunction<Row, T> fromRow) { |
| registerSchemaForType(TypeDescriptor.of(clazz), schema, toRow, fromRow); |
| } |
| |
| /** Register a schema for a specific {@link TypeDescriptor} type. */ |
| public <T> void registerSchemaForType( |
| TypeDescriptor<T> type, |
| Schema schema, |
| SerializableFunction<T, Row> toRow, |
| SerializableFunction<Row, T> fromRow) { |
| entries.put(type, new SchemaEntry<>(schema, toRow, fromRow)); |
| } |
| |
| /** |
| * Register a {@link SchemaProvider}. |
| * |
| * <p>A {@link SchemaProvider} allows for deferred lookups of per-type schemas. This can be used |
| * when schemas are registered in an external service. The SchemaProvider will lookup the type in |
| * the external service and return the correct {@link Schema}. |
| */ |
| public void registerSchemaProvider(SchemaProvider schemaProvider) { |
| providers.addFirst(schemaProvider); |
| } |
| |
| /** Register a {@link SchemaProvider} to be used for a specific type. * */ |
| public <T> void registerSchemaProvider(Class<T> clazz, SchemaProvider schemaProvider) { |
| registerSchemaProvider(TypeDescriptor.of(clazz), schemaProvider); |
| } |
| |
| /** Register a {@link SchemaProvider} to be used for a specific type. * */ |
| public <T> void registerSchemaProvider( |
| TypeDescriptor<T> typeDescriptor, SchemaProvider schemaProvider) { |
| perTypeRegisteredProviders.registerProvider(typeDescriptor, schemaProvider); |
| } |
| |
| /** |
| * Register a POJO type for automatic schema inference. |
| * |
| * <p>Currently schema field names will match field names in the POJO, and all fields must be |
| * mutable (i.e. no final fields). |
| */ |
| public <T> void registerPOJO(Class<T> clazz) { |
| registerPOJO(TypeDescriptor.of(clazz)); |
| } |
| |
| /** |
| * Register a POJO type for automatic schema inference. |
| * |
| * <p>Currently schema field names will match field names in the POJO, and all fields must be |
| * mutable (i.e. no final fields). The Java object is expected to have implemented a correct |
| * .equals() and .hashCode methods The equals method must be completely determined by the schema |
| * fields. i.e. if the object has hidden fields that are not reflected in the schema but are |
| * compared in equals, then results will be incorrect. |
| */ |
| public <T> void registerPOJO(TypeDescriptor<T> typeDescriptor) { |
| registerSchemaProvider(typeDescriptor, new JavaFieldSchema()); |
| } |
| |
| /** |
| * Register a JavaBean type for automatic schema inference. |
| * |
| * <p>Currently schema field names will match getter names in the bean, and all getters must have |
| * matching setters. The Java object is expected to have implemented a correct .equals() and |
| * .hashCode methods The equals method must be completely determined by the schema fields. i.e. if |
| * the object has hidden fields that are not reflected in the schema but are compared in equals, |
| * then results will be incorrect. |
| */ |
| public <T> void registerJavaBean(Class<T> clazz) { |
| registerJavaBean(TypeDescriptor.of(clazz)); |
| } |
| |
| /** |
| * Register a JavaBean type for automatic schema inference. |
| * |
| * <p>Currently schema field names will match getter names in the bean, and all getters must have |
| * matching setters. |
| */ |
| public <T> void registerJavaBean(TypeDescriptor<T> typeDescriptor) { |
| registerSchemaProvider(typeDescriptor, new JavaBeanSchema()); |
| } |
| |
| /** |
| * Get a schema for a given {@link Class} type. If no schema exists, throws {@link |
| * NoSuchSchemaException}. |
| */ |
| public <T> Schema getSchema(Class<T> clazz) throws NoSuchSchemaException { |
| return getSchema(TypeDescriptor.of(clazz)); |
| } |
| |
| private <ReturnT> ReturnT getProviderResult(Function<SchemaProvider, ReturnT> f) |
| throws NoSuchSchemaException { |
| for (SchemaProvider provider : providers) { |
| ReturnT result = f.apply(provider); |
| if (result != null) { |
| return result; |
| } |
| } |
| throw new NoSuchSchemaException(); |
| } |
| |
| /** |
| * Retrieve a schema for a given {@link TypeDescriptor} type. If no schema exists, throws {@link |
| * NoSuchSchemaException}. |
| */ |
| public <T> Schema getSchema(TypeDescriptor<T> typeDescriptor) throws NoSuchSchemaException { |
| SchemaEntry entry = entries.get(typeDescriptor); |
| if (entry != null) { |
| return entry.schema; |
| } |
| return getProviderResult((SchemaProvider p) -> p.schemaFor(typeDescriptor)); |
| } |
| |
| /** Rerieve the function that converts an object of the specified type to a {@link Row} object. */ |
| public <T> SerializableFunction<T, Row> getToRowFunction(Class<T> clazz) |
| throws NoSuchSchemaException { |
| return getToRowFunction(TypeDescriptor.of(clazz)); |
| } |
| |
| /** Rerieve the function that converts an object of the specified type to a {@link Row} object. */ |
| public <T> SerializableFunction<T, Row> getToRowFunction(TypeDescriptor<T> typeDescriptor) |
| throws NoSuchSchemaException { |
| SchemaEntry entry = entries.get(typeDescriptor); |
| if (entry != null) { |
| return entry.toRow; |
| } |
| return getProviderResult((SchemaProvider p) -> p.toRowFunction(typeDescriptor)); |
| } |
| |
| /** Retrieve the function that converts a {@link Row} object to the specified type. */ |
| public <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> clazz) |
| throws NoSuchSchemaException { |
| return getFromRowFunction(TypeDescriptor.of(clazz)); |
| } |
| |
| /** Retrieve the function that converts a {@link Row} object to the specified type. */ |
| public <T> SerializableFunction<Row, T> getFromRowFunction(TypeDescriptor<T> typeDescriptor) |
| throws NoSuchSchemaException { |
| SchemaEntry entry = entries.get(typeDescriptor); |
| if (entry != null) { |
| return entry.fromRow; |
| } |
| return getProviderResult((SchemaProvider p) -> p.fromRowFunction(typeDescriptor)); |
| } |
| |
| static { |
| // find all statically-registered SchemaProviders. |
| List<SchemaProvider> providersToRegister = Lists.newArrayList(); |
| Set<SchemaProviderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE); |
| // Find all SchemaProviderRegistrar classes that are registered as service loaders |
| // (usually using the @AutoService annotation). |
| registrars.addAll( |
| Lists.newArrayList( |
| ServiceLoader.load(SchemaProviderRegistrar.class, ReflectHelpers.findClassLoader()))); |
| // Load all SchemaProviders that are registered using the @DefaultSchema annotation. |
| providersToRegister.addAll( |
| new DefaultSchema.DefaultSchemaProviderRegistrar().getSchemaProviders()); |
| for (SchemaProviderRegistrar registrar : registrars) { |
| providersToRegister.addAll(registrar.getSchemaProviders()); |
| } |
| REGISTERED_SCHEMA_PROVIDERS = ImmutableList.copyOf(providersToRegister); |
| } |
| } |