blob: 88ed2f321b0c6f6a8a6fc47514a0953c3179890d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import com.google.auto.value.AutoValue;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.SubmitterLifecycle;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Class wrapper for a CDAP plugin. */
@AutoValue
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class Plugin {
private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
private static final String PREPARE_RUN_METHOD_NAME = "prepareRun";
protected @Nullable PluginConfig pluginConfig;
protected @Nullable Configuration hadoopConfiguration;
protected @Nullable SubmitterLifecycle cdapPluginObj;
/** Gets the context of a plugin. */
public abstract BatchContextImpl getContext();
/** Gets the main class of a plugin. */
public abstract Class<?> getPluginClass();
/** Gets InputFormat or OutputFormat class for a plugin. */
public abstract Class<?> getFormatClass();
/** Gets InputFormatProvider or OutputFormatProvider class for a plugin. */
public abstract Class<?> getFormatProviderClass();
/** Sets a plugin config. */
public Plugin withConfig(PluginConfig pluginConfig) {
this.pluginConfig = pluginConfig;
return this;
}
/** Gets a plugin config. */
public @Nullable PluginConfig getPluginConfig() {
return pluginConfig;
}
/**
* Calls {@link SubmitterLifecycle#prepareRun(Object)} method on the {@link #cdapPluginObj}
* passing needed {@param config} configuration object as a parameter. This method is needed for
* validating connection to the CDAP sink/source and performing initial tuning.
*/
public void prepareRun() {
PluginConfig pluginConfig = getPluginConfig();
checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
if (cdapPluginObj == null) {
try {
Constructor<?> constructor =
getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
constructor.setAccessible(true);
cdapPluginObj = (SubmitterLifecycle) constructor.newInstance(pluginConfig);
} catch (Exception e) {
LOG.error("Can not instantiate CDAP plugin class", e);
throw new IllegalStateException("Can not call prepareRun");
}
}
try {
cdapPluginObj.prepareRun(getContext());
if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
for (Map.Entry<String, String> entry :
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
getHadoopConfiguration().set(entry.getKey(), entry.getValue());
}
} else {
for (Map.Entry<String, String> entry :
getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet()) {
getHadoopConfiguration().set(entry.getKey(), entry.getValue());
}
getHadoopConfiguration().set(MRJobConfig.ID, String.valueOf(1));
}
} catch (Exception e) {
LOG.error("Error while prepareRun", e);
throw new IllegalStateException("Error while prepareRun");
}
}
/** Sets a plugin Hadoop configuration. */
public Plugin withHadoopConfiguration(Class<?> formatKeyClass, Class<?> formatValueClass) {
PluginConstants.Format formatType = getFormatType();
PluginConstants.Hadoop hadoopType = getHadoopType();
getHadoopConfiguration()
.setClass(hadoopType.getFormatClass(), getFormatClass(), formatType.getFormatClass());
getHadoopConfiguration().setClass(hadoopType.getKeyClass(), formatKeyClass, Object.class);
getHadoopConfiguration().setClass(hadoopType.getValueClass(), formatValueClass, Object.class);
return this;
}
/** Sets a plugin Hadoop configuration. */
public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
this.hadoopConfiguration = hadoopConfiguration;
return this;
}
/** Gets a plugin Hadoop configuration. */
public Configuration getHadoopConfiguration() {
if (hadoopConfiguration == null) {
hadoopConfiguration = new Configuration(false);
}
return hadoopConfiguration;
}
/** Gets a plugin type. */
public abstract PluginConstants.PluginType getPluginType();
/** Gets a format type. */
private PluginConstants.Format getFormatType() {
return getPluginType() == PluginConstants.PluginType.SOURCE
? PluginConstants.Format.INPUT
: PluginConstants.Format.OUTPUT;
}
/** Gets a Hadoop type. */
private PluginConstants.Hadoop getHadoopType() {
return getPluginType() == PluginConstants.PluginType.SOURCE
? PluginConstants.Hadoop.SOURCE
: PluginConstants.Hadoop.SINK;
}
/** Gets value of a plugin type. */
public static PluginConstants.PluginType initPluginType(Class<?> pluginClass)
throws IllegalArgumentException {
if (BatchSource.class.isAssignableFrom(pluginClass)) {
return PluginConstants.PluginType.SOURCE;
} else if (BatchSink.class.isAssignableFrom(pluginClass)) {
return PluginConstants.PluginType.SINK;
} else {
throw new IllegalArgumentException("Provided class should be source or sink plugin");
}
}
public static BatchContextImpl initContext(Class<?> cdapPluginClass) {
// Init context and determine input or output
Class<?> contextClass;
List<Method> methods = new ArrayList<>(Arrays.asList(cdapPluginClass.getDeclaredMethods()));
Class<?> cdapPluginSuperclass = cdapPluginClass.getSuperclass();
if (cdapPluginSuperclass != null) {
methods.addAll(Arrays.asList(cdapPluginSuperclass.getDeclaredMethods()));
}
for (Method method : methods) {
if (method.getName().equals(PREPARE_RUN_METHOD_NAME)) {
contextClass = method.getParameterTypes()[0];
if (contextClass.equals(BatchSourceContext.class)) {
return new BatchSourceContextImpl();
} else if (contextClass.equals(BatchSinkContext.class)) {
return new BatchSinkContextImpl();
}
}
}
throw new IllegalStateException("Cannot determine context class");
}
/** Gets value of a plugin type. */
public Boolean isUnbounded() {
Boolean isUnbounded = null;
for (Annotation annotation : getPluginClass().getDeclaredAnnotations()) {
if (annotation.annotationType().equals(io.cdap.cdap.api.annotation.Plugin.class)) {
String pluginType = ((io.cdap.cdap.api.annotation.Plugin) annotation).type();
isUnbounded = pluginType != null && pluginType.startsWith("streaming");
}
}
if (isUnbounded == null) {
throw new IllegalArgumentException("CDAP plugin class must have Plugin annotation!");
}
return isUnbounded;
}
/** Creates a plugin instance. */
public static Plugin create(
Class<?> newPluginClass, Class<?> newFormatClass, Class<?> newFormatProviderClass) {
return builder()
.setPluginClass(newPluginClass)
.setFormatClass(newFormatClass)
.setFormatProviderClass(newFormatProviderClass)
.setPluginType(Plugin.initPluginType(newPluginClass))
.setContext(Plugin.initContext(newPluginClass))
.build();
}
/** Creates a plugin builder instance. */
public static Builder builder() {
return new AutoValue_Plugin.Builder();
}
/** Builder class for a {@link Plugin}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setPluginClass(Class<?> newPluginClass);
public abstract Builder setFormatClass(Class<?> newFormatClass);
public abstract Builder setFormatProviderClass(Class<?> newFormatProviderClass);
public abstract Builder setPluginType(PluginConstants.PluginType newPluginType);
public abstract Builder setContext(BatchContextImpl context);
public abstract Plugin build();
}
}