blob: 2d4c2156d61b9e023e12fe789b0823e27a63b1a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap;
import static org.apache.beam.sdk.io.cdap.MappingUtils.getPluginByClass;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import io.cdap.cdap.api.plugin.PluginConfig;
import java.util.Map;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.spark.streaming.receiver.Receiver;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* A {@link CdapIO} is a Transform for reading data from source or writing data to sink of a Cdap
* Plugin. It uses {@link HadoopFormatIO} for Batch and SparkReceiverIO for Streaming.
*
* <h2>Read from Cdap Plugin Bounded Source</h2>
*
* <p>To configure {@link CdapIO} source, you must specify Cdap {@link Plugin}, Cdap {@link
* PluginConfig}, key and value classes.
*
* <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains main information about
* the Plugin. The object of the {@link Plugin} class can be created with the {@link
* Plugin#createBatch(Class, Class, Class)} method. Method requires the following parameters:
*
* <ul>
* <li>{@link io.cdap.cdap.etl.api.batch.BatchSource} class
* <li>{@link InputFormat} class
* <li>{@link io.cdap.cdap.api.data.batch.InputFormatProvider} class
* </ul>
*
* <p>For more information about the InputFormat and InputFormatProvider, see {@link
* HadoopFormatIO}.
*
* <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary fields to configure the
* Plugin. You can set the {@link Map} of your parameters with the {@link
* ConfigWrapper#withParams(Map)} method where the key is the field name.
*
* <p>For example, to create a basic {@link CdapIO#read()} transform:
*
* <pre>{@code
* Pipeline p = ...; // Create pipeline.
*
* // Create PluginConfig for specific plugin
* EmployeeConfig pluginConfig =
* new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
*
* // Read using CDAP batch plugin
* p.apply("ReadBatch",
* CdapIO.<String, String>read()
* .withCdapPlugin(
* Plugin.createBatch(
* EmployeeBatchSource.class,
* EmployeeInputFormat.class,
* EmployeeInputFormatProvider.class))
* .withPluginConfig(pluginConfig)
* .withKeyClass(String.class)
* .withValueClass(String.class));
* }</pre>
*
* <h2>Write to Cdap Plugin Bounded Sink</h2>
*
* <p>To configure {@link CdapIO} sink, just as {@link CdapIO#read()} Cdap {@link Plugin}, Cdap
* {@link PluginConfig}, key, value classes must be specified. In addition, it's necessary to
* determine locks directory path {@link CdapIO.Write#withLocksDirPath(String)}. It's used for
* {@link HDFSSynchronization} configuration for {@link HadoopFormatIO}. More info can be found in
* {@link HadoopFormatIO} documentation.
*
* <p>To create the object of the {@link Plugin} class with the {@link Plugin#createBatch(Class,
* Class, Class)} method, need to specify the following parameters:
*
* <ul>
* <li>{@link io.cdap.cdap.etl.api.batch.BatchSink} class
* <li>{@link OutputFormat} class
* <li>{@link io.cdap.cdap.api.data.batch.OutputFormatProvider} class
* </ul>
*
* <p>For more information about the OutputFormat and OutputFormatProvider, see {@link
* HadoopFormatIO}.
*
* <p>Example of {@link CdapIO#write()} usage:
*
* <pre>{@code
* Pipeline p = ...; // Create pipeline.
*
* // Get or create data to write
* PCollection<KV<String, String>> input = p.apply(Create.of(data));
*
* // Create PluginConfig for specific plugin
* EmployeeConfig pluginConfig =
* new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
*
* // Write using CDAP batch plugin
* input.apply(
* "WriteBatch",
* CdapIO.<String, String>write()
* .withCdapPlugin(
* Plugin.createBatch(
* EmployeeBatchSink.class,
* EmployeeOutputFormat.class,
* EmployeeOutputFormatProvider.class))
* .withPluginConfig(pluginConfig)
* .withKeyClass(String.class)
* .withValueClass(String.class)
* .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
* p.run();
* }</pre>
*
* <h2>Read from Cdap Plugin Streaming Source</h2>
*
* <p>To configure {@link CdapIO} source, you must specify Cdap {@link Plugin}, Cdap {@link
* PluginConfig}, key and value classes.
*
* <p>Optionally you can pass {@code pullFrequencySec} which is a delay in seconds between polling
* for new records updates, you can pass {@code startOffset} which is inclusive start offset from
* which the reading should be started. Also, you can pass {@code startPollTimeoutSec} which is
* delay in seconds before start polling.
*
* <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains main information about
* the Plugin. The object of the {@link Plugin} class can be created with the {@link
* Plugin#createStreaming(Class, SerializableFunction, Class)} method. Method requires {@link
* io.cdap.cdap.etl.api.streaming.StreamingSource} class, {@code getOffsetFn} which is a {@link
* SerializableFunction} that defines how to get {@code Long offset} from {@code V record}, Spark
* {@link Receiver} class parameters.
*
* <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary fields to configure the
* Plugin. You can set the {@link Map} of your parameters with the {@link
* ConfigWrapper#withParams(Map)} method where the key is the field name.
*
* <p>For example, to create a basic {@link CdapIO#read()} transform:
*
* <pre>{@code
* Pipeline p = ...; // Create pipeline.
*
* // Create PluginConfig for specific plugin
* EmployeeConfig pluginConfig =
* new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
*
* // Read using CDAP streaming plugin
* p.apply("ReadStreaming",
* CdapIO.<String, String>read()
* .withCdapPlugin(
* Plugin.createStreaming(
* EmployeeStreamingSource.class,
* Long::valueOf,
* EmployeeReceiver.class))
* .withPluginConfig(pluginConfig)
* .withKeyClass(String.class)
* .withValueClass(String.class)
* .withPullFrequencySec(1L)
* .withStartPollTimeoutSec(2L)
* .withStartOffset(10L);
* }</pre>
*/
public class CdapIO {
public static <K, V> Read<K, V> read() {
return new AutoValue_CdapIO_Read.Builder<K, V>().build();
}
public static <K, V> Write<K, V> write() {
return new AutoValue_CdapIO_Write.Builder<K, V>().build();
}
/** A {@link PTransform} to read from CDAP source. */
@AutoValue
@AutoValue.CopyAnnotations
public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
abstract @Nullable PluginConfig getPluginConfig();
abstract @Nullable Plugin<K, V> getCdapPlugin();
/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
* OutputFormat}), appropriate key class ("key.class") in Hadoop {@link Configuration} must be
* provided. If you set different Format key class than Format's actual key class then, it may
* result in an error. More info can be found in {@link HadoopFormatIO} documentation.
*/
abstract @Nullable Class<K> getKeyClass();
/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
* OutputFormat}), appropriate value class ("value.class") in Hadoop {@link Configuration} must
* be provided. If you set different Format value class than Format's actual value class then,
* it may result in an error. More info can be found in {@link HadoopFormatIO} documentation.
*/
abstract @Nullable Class<V> getValueClass();
abstract @Nullable Long getPullFrequencySec();
abstract @Nullable Long getStartPollTimeoutSec();
abstract @Nullable Long getStartOffset();
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
abstract static class Builder<K, V> {
abstract Builder<K, V> setPluginConfig(PluginConfig config);
abstract Builder<K, V> setCdapPlugin(Plugin<K, V> plugin);
abstract Builder<K, V> setKeyClass(Class<K> keyClass);
abstract Builder<K, V> setValueClass(Class<V> valueClass);
abstract Builder<K, V> setPullFrequencySec(Long pullFrequencySec);
abstract Builder<K, V> setStartPollTimeoutSec(Long startPollTimeoutSec);
abstract Builder<K, V> setStartOffset(Long startOffset);
abstract Read<K, V> build();
}
/** Sets a CDAP {@link Plugin}. */
public Read<K, V> withCdapPlugin(Plugin<K, V> plugin) {
checkArgument(plugin != null, "Cdap plugin can not be null");
return toBuilder().setCdapPlugin(plugin).build();
}
/** Sets a CDAP Plugin class. */
public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
Plugin<K, V> plugin = MappingUtils.getPluginByClass(cdapPluginClass);
return toBuilder().setCdapPlugin(plugin).build();
}
/** Sets a {@link PluginConfig}. */
public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
checkArgument(pluginConfig != null, "Plugin config can not be null");
return toBuilder().setPluginConfig(pluginConfig).build();
}
/** Sets a key class. */
public Read<K, V> withKeyClass(Class<K> keyClass) {
checkArgument(keyClass != null, "Key class can not be null");
return toBuilder().setKeyClass(keyClass).build();
}
/** Sets a value class. */
public Read<K, V> withValueClass(Class<V> valueClass) {
checkArgument(valueClass != null, "Value class can not be null");
return toBuilder().setValueClass(valueClass).build();
}
/**
* Delay in seconds between polling for new records updates. Applicable only for streaming Cdap
* Plugins.
*/
public Read<K, V> withPullFrequencySec(Long pullFrequencySec) {
checkArgument(pullFrequencySec != null, "Pull frequency can not be null");
return toBuilder().setPullFrequencySec(pullFrequencySec).build();
}
/** Delay in seconds before start polling. Applicable only for streaming Cdap Plugins. */
public Read<K, V> withStartPollTimeoutSec(Long startPollTimeoutSec) {
checkArgument(startPollTimeoutSec != null, "Start poll timeout can not be null");
return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build();
}
/**
* Inclusive start offset from which the reading should be started. Applicable only for
* streaming Cdap Plugins.
*/
public Read<K, V> withStartOffset(Long startOffset) {
checkArgument(startOffset != null, "Start offset can not be null");
return toBuilder().setStartOffset(startOffset).build();
}
@Override
public PCollection<KV<K, V>> expand(PBegin input) {
Plugin<K, V> cdapPlugin = getCdapPlugin();
checkStateNotNull(cdapPlugin, "withCdapPluginClass() is required");
PluginConfig pluginConfig = getPluginConfig();
checkStateNotNull(pluginConfig, "withPluginConfig() is required");
Class<V> valueClass = getValueClass();
checkStateNotNull(valueClass, "withValueClass() is required");
Class<K> keyClass = getKeyClass();
checkStateNotNull(keyClass, "withKeyClass() is required");
cdapPlugin.withConfig(pluginConfig);
if (cdapPlugin.isUnbounded()) {
SerializableFunction<V, Long> getOffsetFn = cdapPlugin.getGetOffsetFn();
checkStateNotNull(getOffsetFn, "Plugin get offset function can't be null!");
ReceiverBuilder<V, ? extends Receiver<V>> receiverBuilder = cdapPlugin.getReceiverBuilder();
checkStateNotNull(receiverBuilder, "Plugin Receiver builder can't be null!");
SparkReceiverIO.Read<V> reader =
SparkReceiverIO.<V>read()
.withGetOffsetFn(getOffsetFn)
.withSparkReceiverBuilder(receiverBuilder);
Long pullFrequencySec = getPullFrequencySec();
if (pullFrequencySec != null) {
reader = reader.withPullFrequencySec(pullFrequencySec);
}
Long startPollTimeoutSec = getStartPollTimeoutSec();
if (startPollTimeoutSec != null) {
reader = reader.withStartPollTimeoutSec(startPollTimeoutSec);
}
Long startOffset = getStartOffset();
if (startOffset != null) {
reader = reader.withStartOffset(startOffset);
}
try {
Coder<V> coder = input.getPipeline().getCoderRegistry().getCoder(valueClass);
PCollection<V> values = input.apply(reader).setCoder(coder);
SerializableFunction<V, KV<K, V>> fn = input1 -> KV.of(null, input1);
return values.apply(MapElements.into(new TypeDescriptor<KV<K, V>>() {}).via(fn));
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not get value Coder", e);
}
} else {
cdapPlugin.withHadoopConfiguration(keyClass, valueClass).prepareRun();
Configuration hConf = cdapPlugin.getHadoopConfiguration();
HadoopFormatIO.Read<K, V> readFromHadoop =
HadoopFormatIO.<K, V>read().withConfiguration(hConf);
return input.apply(readFromHadoop);
}
}
}
/** A {@link PTransform} to write to CDAP sink. */
@AutoValue
@AutoValue.CopyAnnotations
public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
abstract @Nullable PluginConfig getPluginConfig();
abstract @Nullable Plugin<K, V> getCdapPlugin();
/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
* OutputFormat}), appropriate key class ("key.class") in Hadoop {@link Configuration} must be
* provided. If you set different Format key class than Format's actual key class then, it may
* result in an error. More info can be found in {@link HadoopFormatIO} documentation.
*/
abstract @Nullable Class<K> getKeyClass();
/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
* OutputFormat}), appropriate value class ("value.class") in Hadoop {@link Configuration} must
* be provided. If you set different Format value class than Format's actual value class then,
* it may result in an error. More info can be found in {@link HadoopFormatIO} documentation.
*/
abstract @Nullable Class<V> getValueClass();
/**
* Directory where locks will be stored. This directory MUST be different that directory which
* is possibly stored under FileOutputFormat.outputDir key. Used for {@link HDFSSynchronization}
* configuration for {@link HadoopFormatIO}. More info can be found in {@link HadoopFormatIO}
* documentation.
*/
abstract @Nullable String getLocksDirPath();
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
abstract static class Builder<K, V> {
abstract Builder<K, V> setPluginConfig(PluginConfig config);
abstract Builder<K, V> setCdapPlugin(Plugin<K, V> plugin);
abstract Builder<K, V> setKeyClass(Class<K> keyClass);
abstract Builder<K, V> setValueClass(Class<V> valueClass);
abstract Builder<K, V> setLocksDirPath(String path);
abstract Write<K, V> build();
}
/** Sets a CDAP {@link Plugin}. */
public Write<K, V> withCdapPlugin(Plugin<K, V> plugin) {
checkArgument(plugin != null, "Cdap plugin can not be null");
return toBuilder().setCdapPlugin(plugin).build();
}
/** Sets a CDAP Plugin class. */
public Write<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
Plugin<K, V> plugin = getPluginByClass(cdapPluginClass);
return toBuilder().setCdapPlugin(plugin).build();
}
/** Sets a {@link PluginConfig}. */
public Write<K, V> withPluginConfig(PluginConfig pluginConfig) {
checkArgument(pluginConfig != null, "Plugin config can not be null");
return toBuilder().setPluginConfig(pluginConfig).build();
}
/** Sets a key class. */
public Write<K, V> withKeyClass(Class<K> keyClass) {
checkArgument(keyClass != null, "Key class can not be null");
return toBuilder().setKeyClass(keyClass).build();
}
/** Sets path to directory where locks will be stored. */
public Write<K, V> withLocksDirPath(String locksDirPath) {
checkArgument(locksDirPath != null, "Locks dir path can not be null");
return toBuilder().setLocksDirPath(locksDirPath).build();
}
/** Sets a value class. */
public Write<K, V> withValueClass(Class<V> valueClass) {
checkArgument(valueClass != null, "Value class can not be null");
return toBuilder().setValueClass(valueClass).build();
}
@Override
public PDone expand(PCollection<KV<K, V>> input) {
Plugin<K, V> cdapPlugin = getCdapPlugin();
checkStateNotNull(cdapPlugin, "withCdapPluginClass() is required");
PluginConfig pluginConfig = getPluginConfig();
checkStateNotNull(pluginConfig, "withPluginConfig() is required");
Class<K> keyClass = getKeyClass();
checkStateNotNull(keyClass, "withKeyClass() is required");
Class<V> valueClass = getValueClass();
checkStateNotNull(valueClass, "withValueClass() is required");
String locksDirPath = getLocksDirPath();
checkStateNotNull(locksDirPath, "withLocksDirPath() is required");
cdapPlugin
.withConfig(pluginConfig)
.withHadoopConfiguration(keyClass, valueClass)
.prepareRun();
if (cdapPlugin.isUnbounded()) {
// TODO: implement SparkReceiverIO.<~>write()
throw new NotImplementedException("Support for unbounded plugins is not implemented!");
} else {
Configuration hConf = cdapPlugin.getHadoopConfiguration();
HadoopFormatIO.Write<K, V> writeHadoop;
if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
|| !input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) {
ConfigTransform<K, V> configTransform = new ConfigTransform<>(hConf);
writeHadoop =
HadoopFormatIO.<K, V>write()
.withConfigurationTransform(configTransform)
.withExternalSynchronization(new HDFSSynchronization(locksDirPath));
} else {
writeHadoop =
HadoopFormatIO.<K, V>write()
.withConfiguration(hConf)
.withPartitioning()
.withExternalSynchronization(new HDFSSynchronization(locksDirPath));
}
return input.apply(writeHadoop);
}
}
/** Simple transform for providing Hadoop {@link Configuration} into {@link HadoopFormatIO}. */
private static class ConfigTransform<KeyT, ValueT>
extends PTransform<
PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> {
private final transient Configuration hConf;
private ConfigTransform(Configuration hConf) {
this.hConf = hConf;
}
@Override
public PCollectionView<Configuration> expand(PCollection<? extends KV<KeyT, ValueT>> input) {
return input
.getPipeline()
.apply(Create.<Configuration>of(hConf))
.apply(View.<Configuration>asSingleton().withDefaultValue(hConf));
}
}
}
}