blob: f74a6948463214f9df45c77a0559e7a42ac3f9e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.SubmitterLifecycle;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
import org.apache.beam.sdk.io.cdap.context.StreamingSourceContextImpl;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Class wrapper for a CDAP plugin. */
@AutoValue
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class Plugin<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
private static final String PREPARE_RUN_METHOD_NAME = "prepareRun";
private static final String GET_STREAM_METHOD_NAME = "getStream";
protected @Nullable PluginConfig pluginConfig;
protected @Nullable Configuration hadoopConfiguration;
protected @Nullable SubmitterLifecycle cdapPluginObj;
/** Gets the context of a plugin. */
public abstract BatchContextImpl getContext();
/** Gets the main class of a plugin. */
public abstract Class<?> getPluginClass();
/** Gets InputFormat or OutputFormat class for a plugin. */
public @Nullable abstract Class<?> getFormatClass();
/** Gets InputFormatProvider or OutputFormatProvider class for a plugin. */
public @Nullable abstract Class<?> getFormatProviderClass();
/** Gets Spark {@link Receiver} class for a CDAP plugin. */
public @Nullable abstract Class<? extends Receiver<V>> getReceiverClass();
/**
* Gets a {@link SerializableFunction} that defines how to get record offset for CDAP {@link
* Plugin} class.
*/
public @Nullable abstract SerializableFunction<V, Long> getGetOffsetFn();
/**
* Gets a {@link SerializableFunction} that defines how to get constructor arguments for {@link
* Receiver} using {@link PluginConfig}.
*/
public @Nullable abstract SerializableFunction<PluginConfig, Object[]>
getGetReceiverArgsFromConfigFn();
/** Sets a plugin config. */
public Plugin<K, V> withConfig(PluginConfig pluginConfig) {
this.pluginConfig = pluginConfig;
return this;
}
/** Gets a plugin config. */
public @Nullable PluginConfig getPluginConfig() {
return pluginConfig;
}
/**
* Calls {@link SubmitterLifecycle#prepareRun(Object)} method on the {@link #cdapPluginObj}
* passing needed {@param config} configuration object as a parameter. This method is needed for
* validating connection to the CDAP sink/source and performing initial tuning.
*/
public void prepareRun() {
if (isUnbounded()) {
// Not needed for unbounded plugins
return;
}
if (cdapPluginObj == null) {
instantiateCdapPluginObj();
}
checkStateNotNull(cdapPluginObj, "Cdap Plugin object can't be null!");
try {
cdapPluginObj.prepareRun(getContext());
} catch (Exception e) {
LOG.error("Error while prepareRun", e);
throw new IllegalStateException("Error while prepareRun", e);
}
if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
for (Map.Entry<String, String> entry :
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
getHadoopConfiguration().set(entry.getKey(), entry.getValue());
}
} else {
for (Map.Entry<String, String> entry :
getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet()) {
getHadoopConfiguration().set(entry.getKey(), entry.getValue());
}
getHadoopConfiguration().set(MRJobConfig.ID, String.valueOf(1));
}
}
/** Creates an instance of {@link #cdapPluginObj} using {@link #pluginConfig}. */
private void instantiateCdapPluginObj() {
PluginConfig pluginConfig = getPluginConfig();
checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
try {
Constructor<?> constructor = getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
constructor.setAccessible(true);
cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
} catch (Exception e) {
LOG.error("Can not instantiate CDAP plugin class", e);
throw new IllegalStateException("Can not call prepareRun");
}
}
/** Sets a plugin Hadoop configuration. */
public Plugin<K, V> withHadoopConfiguration(Class<K> formatKeyClass, Class<V> formatValueClass) {
Class<?> formatClass = getFormatClass();
checkStateNotNull(formatClass, "Format class can't be null!");
PluginConstants.Format formatType = getFormatType();
PluginConstants.Hadoop hadoopType = getHadoopType();
getHadoopConfiguration()
.setClass(hadoopType.getFormatClass(), formatClass, formatType.getFormatClass());
getHadoopConfiguration().setClass(hadoopType.getKeyClass(), formatKeyClass, Object.class);
getHadoopConfiguration().setClass(hadoopType.getValueClass(), formatValueClass, Object.class);
return this;
}
/** Sets a plugin Hadoop configuration. */
public Plugin<K, V> withHadoopConfiguration(Configuration hadoopConfiguration) {
this.hadoopConfiguration = hadoopConfiguration;
return this;
}
/** Gets a plugin Hadoop configuration. */
public Configuration getHadoopConfiguration() {
if (hadoopConfiguration == null) {
hadoopConfiguration = new Configuration(false);
}
return hadoopConfiguration;
}
/** Gets a plugin type. */
public abstract PluginConstants.PluginType getPluginType();
/** Gets a format type. */
private PluginConstants.Format getFormatType() {
return getPluginType() == PluginConstants.PluginType.SOURCE
? PluginConstants.Format.INPUT
: PluginConstants.Format.OUTPUT;
}
/** Gets a Hadoop type. */
private PluginConstants.Hadoop getHadoopType() {
return getPluginType() == PluginConstants.PluginType.SOURCE
? PluginConstants.Hadoop.SOURCE
: PluginConstants.Hadoop.SINK;
}
/** Gets value of a plugin type. */
public static PluginConstants.PluginType initPluginType(Class<?> pluginClass)
throws IllegalArgumentException {
if (StreamingSource.class.isAssignableFrom(pluginClass)
|| BatchSource.class.isAssignableFrom(pluginClass)) {
return PluginConstants.PluginType.SOURCE;
} else if (BatchSink.class.isAssignableFrom(pluginClass)) {
return PluginConstants.PluginType.SINK;
} else {
throw new IllegalArgumentException("Provided class should be source or sink plugin");
}
}
/** Initializes {@link BatchContextImpl} for CDAP plugin. */
public static BatchContextImpl initContext(Class<?> cdapPluginClass) {
// Init context and determine input or output
Class<?> contextClass;
List<Method> methods = new ArrayList<>(Arrays.asList(cdapPluginClass.getDeclaredMethods()));
Class<?> cdapPluginSuperclass = cdapPluginClass.getSuperclass();
if (cdapPluginSuperclass != null) {
methods.addAll(Arrays.asList(cdapPluginSuperclass.getDeclaredMethods()));
}
for (Method method : methods) {
if (method.getName().equals(PREPARE_RUN_METHOD_NAME)) {
contextClass = method.getParameterTypes()[0];
if (contextClass.equals(BatchSourceContext.class)) {
return new BatchSourceContextImpl();
} else if (contextClass.equals(BatchSinkContext.class)) {
return new BatchSinkContextImpl();
}
} else if (method.getName().equals(GET_STREAM_METHOD_NAME)) {
return new StreamingSourceContextImpl();
}
}
throw new IllegalStateException("Cannot determine context class");
}
/** Gets value of a plugin type. */
public Boolean isUnbounded() {
Boolean isUnbounded = null;
for (Annotation annotation : getPluginClass().getDeclaredAnnotations()) {
if (annotation.annotationType().equals(io.cdap.cdap.api.annotation.Plugin.class)) {
String pluginType = ((io.cdap.cdap.api.annotation.Plugin) annotation).type();
isUnbounded = pluginType != null && pluginType.startsWith("streaming");
}
}
if (isUnbounded == null) {
throw new IllegalArgumentException("CDAP plugin class must have Plugin annotation!");
}
return isUnbounded;
}
/** Gets a {@link ReceiverBuilder}. */
public ReceiverBuilder<V, ? extends Receiver<V>> getReceiverBuilder() {
checkState(isUnbounded(), "Receiver Builder is supported only for unbounded plugins");
Class<?> pluginClass = getPluginClass();
Class<? extends Receiver<V>> receiverClass = getReceiverClass();
SerializableFunction<PluginConfig, Object[]> getReceiverArgsFromConfigFn =
getGetReceiverArgsFromConfigFn();
PluginConfig pluginConfig = getPluginConfig();
checkStateNotNull(pluginConfig, "Plugin config can not be null!");
checkStateNotNull(pluginClass, "Plugin class can not be null!");
checkStateNotNull(receiverClass, "Receiver class can not be null!");
checkStateNotNull(
getReceiverArgsFromConfigFn, "Get receiver args from config function can not be null!");
return new ReceiverBuilder<>(receiverClass)
.withConstructorArgs(getReceiverArgsFromConfigFn.apply(pluginConfig));
}
/**
* Creates a batch plugin instance.
*
* @param newPluginClass class of the CDAP plugin {@link io.cdap.cdap.api.annotation.Plugin}.
* @param newFormatClass Hadoop Input or Output format class.
* @param newFormatProviderClass Hadoop Input or Output format provider class.
*/
public static <K, V> Plugin<K, V> createBatch(
Class<?> newPluginClass, Class<?> newFormatClass, Class<?> newFormatProviderClass) {
return Plugin.<K, V>builder()
.setPluginClass(newPluginClass)
.setFormatClass(newFormatClass)
.setFormatProviderClass(newFormatProviderClass)
.setPluginType(Plugin.initPluginType(newPluginClass))
.setContext(Plugin.initContext(newPluginClass))
.build();
}
/**
* Creates a streaming plugin instance.
*
* @param newPluginClass class of the CDAP plugin {@link io.cdap.cdap.api.plugin.Plugin}.
* @param getOffsetFn {@link SerializableFunction} that defines how to get record offset for CDAP
* {@link io.cdap.cdap.api.annotation.Plugin} class.
* @param receiverClass Spark {@link Receiver} class for a CDAP plugin.
* @param getReceiverArgsFromConfigFn {@link SerializableFunction} that defines how to get
* constructor arguments for {@link Receiver} using {@link PluginConfig}.
*/
public static <K, V> Plugin<K, V> createStreaming(
Class<?> newPluginClass,
SerializableFunction<V, Long> getOffsetFn,
Class<? extends Receiver<V>> receiverClass,
SerializableFunction<PluginConfig, Object[]> getReceiverArgsFromConfigFn) {
return Plugin.<K, V>builder()
.setPluginClass(newPluginClass)
.setPluginType(Plugin.initPluginType(newPluginClass))
.setContext(Plugin.initContext(newPluginClass))
.setGetOffsetFn(getOffsetFn)
.setReceiverClass(receiverClass)
.setGetReceiverArgsFromConfigFn(getReceiverArgsFromConfigFn)
.build();
}
/**
* Creates a streaming plugin instance with default function for getting args for {@link
* Receiver}.
*
* @param newPluginClass class of the CDAP plugin {@link io.cdap.cdap.api.plugin.Plugin}.
* @param getOffsetFn {@link SerializableFunction} that defines how to get record offset for CDAP
* {@link io.cdap.cdap.api.annotation.Plugin} class.
* @param receiverClass Spark {@link Receiver} class for a CDAP plugin.
*/
public static <K, V> Plugin<K, V> createStreaming(
Class<?> newPluginClass,
SerializableFunction<V, Long> getOffsetFn,
Class<? extends Receiver<V>> receiverClass) {
return createStreaming(
newPluginClass, getOffsetFn, receiverClass, config -> new Object[] {config});
}
/** Creates a plugin builder instance. */
public static <K, V> Builder<K, V> builder() {
return new AutoValue_Plugin.Builder<>();
}
/** Builder class for a {@link Plugin}. */
@AutoValue.Builder
public abstract static class Builder<K, V> {
public abstract Builder<K, V> setPluginClass(Class<?> newPluginClass);
public abstract Builder<K, V> setFormatClass(Class<?> newFormatClass);
public abstract Builder<K, V> setFormatProviderClass(Class<?> newFormatProviderClass);
public abstract Builder<K, V> setGetOffsetFn(SerializableFunction<V, Long> getOffsetFn);
public abstract Builder<K, V> setGetReceiverArgsFromConfigFn(
SerializableFunction<PluginConfig, Object[]> getReceiverArgsFromConfigFn);
public abstract Builder<K, V> setReceiverClass(Class<? extends Receiver<V>> receiverClass);
public abstract Builder<K, V> setPluginType(PluginConstants.PluginType newPluginType);
public abstract Builder<K, V> setContext(BatchContextImpl context);
public abstract Plugin<K, V> build();
}
}