blob: d3b7d10b34d74be4370540326d0d396336439012 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.annotations;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaProviderRegistrar;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
/**
* The {@link DefaultSchema} annotation specifies a {@link SchemaProvider} class to handle obtaining
* a schema and row for the specified class.
*
* <p>For example, if your class is JavaBean, the JavaBeanSchema provider class knows how to vend
* schemas for this class. You can annotate it as follows:
*
* <pre>{@code @DefaultSchema(JavaBeanSchema.class)
* class MyClass {
* public String getFoo();
* void setFoo(String foo);
* ....
* }
* }</pre>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@SuppressWarnings("rawtypes")
@Experimental(Kind.SCHEMAS)
public @interface DefaultSchema {
/** The schema provider implementation that knows how to vend schemas for the annotated class. */
@CheckForNull
Class<? extends SchemaProvider> value();
/**
* {@link SchemaProvider} for default schemas. Looks up the provider annotated for a type, and
* delegates to that provider.
*/
class DefaultSchemaProvider implements SchemaProvider {
final Map<TypeDescriptor, ProviderAndDescriptor> cachedProviders = Maps.newConcurrentMap();
private static final class ProviderAndDescriptor implements Serializable {
final SchemaProvider schemaProvider;
final TypeDescriptor<?> typeDescriptor;
public ProviderAndDescriptor(
SchemaProvider schemaProvider, TypeDescriptor<?> typeDescriptor) {
this.schemaProvider = schemaProvider;
this.typeDescriptor = typeDescriptor;
}
}
@Nullable
private ProviderAndDescriptor getSchemaProvider(TypeDescriptor<?> typeDescriptor) {
return cachedProviders.computeIfAbsent(
typeDescriptor,
type -> {
Class<?> clazz = type.getRawType();
do {
DefaultSchema annotation = clazz.getAnnotation(DefaultSchema.class);
if (annotation != null) {
Class<? extends SchemaProvider> providerClass = annotation.value();
checkArgument(
providerClass != null,
"Type " + type + " has a @DefaultSchema annotation with a null argument.");
try {
return new ProviderAndDescriptor(
providerClass.getDeclaredConstructor().newInstance(),
TypeDescriptor.of(clazz));
} catch (NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw new IllegalStateException(
"Failed to create SchemaProvider "
+ providerClass.getSimpleName()
+ " which was"
+ " specified as the default SchemaProvider for type "
+ type
+ ". Make "
+ " sure that this class has a public default constructor.",
e);
}
}
clazz = clazz.getSuperclass();
} while (clazz != null && !clazz.equals(Object.class));
return null;
});
}
@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
ProviderAndDescriptor providerAndDescriptor = getSchemaProvider(typeDescriptor);
return (providerAndDescriptor != null)
? providerAndDescriptor.schemaProvider.schemaFor(providerAndDescriptor.typeDescriptor)
: null;
}
/**
* Given a type, return a function that converts that type to a {@link Row} object If no schema
* exists, returns null.
*/
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
ProviderAndDescriptor providerAndDescriptor = getSchemaProvider(typeDescriptor);
return (providerAndDescriptor != null)
? providerAndDescriptor.schemaProvider.toRowFunction(
(TypeDescriptor<T>) providerAndDescriptor.typeDescriptor)
: null;
}
/**
* Given a type, returns a function that converts from a {@link Row} object to that type. If no
* schema exists, returns null.
*/
@Override
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
ProviderAndDescriptor providerAndDescriptor = getSchemaProvider(typeDescriptor);
return (providerAndDescriptor != null)
? providerAndDescriptor.schemaProvider.fromRowFunction(
(TypeDescriptor<T>) providerAndDescriptor.typeDescriptor)
: null;
}
}
/** Registrar for default schemas. */
class DefaultSchemaProviderRegistrar implements SchemaProviderRegistrar {
@Override
public List<SchemaProvider> getSchemaProviders() {
return ImmutableList.of(new DefaultSchemaProvider());
}
}
}