blob: 30954216a480ecd831263aa18258eb9c4502eab1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static java.util.Objects.requireNonNull;
import static;
import static;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A {@link HadoopFormatIO} is a Transform for reading data from any source or writing data to any
* sink which implements Hadoop {@link InputFormat} or {@link OutputFormat}. For example: Cassandra,
* Elasticsearch, HBase, Redis, Postgres etc. {@link HadoopFormatIO} has to make several performance
* trade-offs in connecting to {@link InputFormat} or {@link OutputFormat}, so if there is another
* Beam IO Transform specifically for connecting to your data source of choice, we would recommend
* using that one, but this IO Transform allows you to connect to many data sources/sinks that do
* not yet have a Beam IO Transform.
* <h3>Reading using Hadoop {@link HadoopFormatIO}</h3>
* <p>You will need to pass a Hadoop {@link Configuration} with parameters specifying how the read
* will occur. Many properties of the Configuration are optional, and some are required for certain
* {@link InputFormat} classes, but the following properties must be set for all InputFormats:
* <ul>
* <li>{@code mapreduce.job.inputformat.class}: The {@link InputFormat} class used to connect to
* your data source of choice.
* <li>{@code key.class}: The key class returned by the {@link InputFormat} in {@code
* mapreduce.job.inputformat.class}.
* <li>{@code value.class}: The value class returned by the {@link InputFormat} in {@code
* mapreduce.job.inputformat.class}.
* </ul>
* For example:
* <pre>
* {
* Configuration myHadoopConfiguration = new Configuration(false);
* // Set Hadoop InputFormat, key and value class in configuration
* myHadoopConfiguration.setClass(&quot;mapreduce.job.inputformat.class&quot;,
* MyDbInputFormatClass, InputFormat.class);
* myHadoopConfiguration.setClass(&quot;key.class&quot;, MyDbInputFormatKeyClass, Object.class);
* myHadoopConfiguration.setClass(&quot;value.class&quot;,
* MyDbInputFormatValueClass, Object.class);
* }
* </pre>
* <p>You will need to check to see if the key and value classes output by the {@link InputFormat}
* have a Beam {@link Coder} available. If not, you can use withKeyTranslation/withValueTranslation
* to specify a method transforming instances of those classes into another class that is supported
* by a Beam {@link Coder}. These settings are optional and you don't need to specify translation
* for both key and value. If you specify a translation, you will need to make sure the K or V of
* the read transform match the output type of the translation.
* <p>You will need to set appropriate InputFormat key and value class (i.e. "key.class" and
* "value.class") in Hadoop {@link Configuration}. If you set different InputFormat key or value
* class than InputFormat's actual key or value class then, it may result in an error like
* "unexpected extra bytes after decoding" while the decoding process of key/value object happens.
* Hence, it is important to set appropriate InputFormat key and value class.
* <pre>{@code
* Pipeline p = ...; // Create pipeline.
* // Read data only with Hadoop configuration.
* p.apply("read",
* HadoopFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
* .withConfiguration(myHadoopConfiguration);
* }
* // Read data with configuration and key translation (Example scenario: Beam Coder is not
* available for key class hence key translation is required.).
* SimpleFunction&lt;InputFormatKeyClass, MyKeyClass&gt; myOutputKeyType =
* new SimpleFunction&lt;InputFormatKeyClass, MyKeyClass&gt;() {
* public MyKeyClass apply(InputFormatKeyClass input) {
* // ...logic to transform InputFormatKeyClass to MyKeyClass
* }
* };
* </pre>
* <pre>{@code
* p.apply("read",
* HadoopFormatIO.<MyKeyClass, InputFormatKeyClass>read()
* .withConfiguration(myHadoopConfiguration)
* .withKeyTranslation(myOutputKeyType);
* }</pre>
* <p>// Read data with configuration and value translation (Example scenario: Beam Coder is not
* available for value class hence value translation is required.).
* <pre>{@code
* SimpleFunction&lt;InputFormatValueClass, MyValueClass&gt; myOutputValueType =
* new SimpleFunction&lt;InputFormatValueClass, MyValueClass&gt;() {
* public MyValueClass apply(InputFormatValueClass input) {
* // ...logic to transform InputFormatValueClass to MyValueClass
* }
* };
* }</pre>
* <pre>{@code
* p.apply("read",
* HadoopFormatIO.<InputFormatKeyClass, MyValueClass>read()
* .withConfiguration(myHadoopConfiguration)
* .withValueTranslation(myOutputValueType);
* }</pre>
* <p>IMPORTANT! In case of using {@code DBInputFormat} to read data from RDBMS, Beam parallelizes
* the process by using LIMIT and OFFSET clauses of SQL query to fetch different ranges of records
* (as a split) by different workers. To guarantee the same order and proper split of results you
* need to order them by one or more keys (either PRIMARY or UNIQUE). It can be done during
* configuration step, for example:
* <pre>{@code
* Configuration conf = new Configuration();
* conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
* conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
* conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
* }</pre>
* <h3>Writing using Hadoop {@link HadoopFormatIO}</h3>
* <p>You will need to pass a Hadoop {@link Configuration} with parameters specifying how the write
* will occur. Many properties of the Configuration are optional, and some are required for certain
* {@link OutputFormat} classes, but the following properties must be set for all OutputFormats:
* <ul>
* <li>{@code}: The identifier of the write job. E.g.: end timestamp of window.
* <li>{@code mapreduce.job.outputformat.class}: The {@link OutputFormat} class used to connect to
* your data sink of choice.
* <li>{@code mapreduce.job.output.key.class}: The key class passed to the {@link OutputFormat} in
* {@code mapreduce.job.outputformat.class}.
* <li>{@code mapreduce.job.output.value.class}: The value class passed to the {@link
* OutputFormat} in {@code mapreduce.job.outputformat.class}.
* <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of write
* tasks which will be generated. This property is not required for {@link
* Write.PartitionedWriterBuilder#withoutPartitioning()} write.
* <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used for
* distributing of records among partitions. This property is not required for {@link
* Write.PartitionedWriterBuilder#withoutPartitioning()} write.
* </ul>
* <b>Note:</b> All mentioned values have appropriate constants. E.g.: {@link
* <p>For example:
* <pre>{@code
* Configuration myHadoopConfiguration = new Configuration(false);
* // Set Hadoop OutputFormat, key and value class in configuration
* myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.class&quot;,
* MyDbOutputFormatClass, OutputFormat.class);
* myHadoopConfiguration.setClass(&quot;mapreduce.job.output.key.class&quot;,
* MyDbOutputFormatKeyClass, Object.class);
* myHadoopConfiguration.setClass(&quot;mapreduce.job.output.value.class&quot;,
* MyDbOutputFormatValueClass, Object.class);
* myHadoopConfiguration.setClass(&quot;mapreduce.job.partitioner.class&quot;,
* MyPartitionerClass, Object.class);
* myHadoopConfiguration.setInt(&quot;mapreduce.job.reduces&quot;, 2);
* }</pre>
* <p>You will need to set OutputFormat key and value class (i.e. "mapreduce.job.output.key.class"
* and "mapreduce.job.output.value.class") in Hadoop {@link Configuration} which are equal to {@code
* KeyT} and {@code ValueT}. If you set different OutputFormat key or value class than
* OutputFormat's actual key or value class then, it will throw {@link IllegalArgumentException}
* <h4>Batch writing</h4>
* <pre>{@code
* //Data which will we want to write
* PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
* //Hadoop configuration for write
* //We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
* Configuration myHadoopConfiguration = ...
* //path to directory with locks
* String locksDirPath = ...;
* boundedWordsCount.apply(
* "writeBatch",
* HadoopFormatIO.<Text, LongWritable>write()
* .withConfiguration(myHadoopConfiguration)
* .withPartitioning()
* .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
* }</pre>
* <h4>Stream writing</h4>
* <pre>{@code
* // Data which will we want to write
* PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
* // Transformation which transforms data of one window into one hadoop configuration
* PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
* configTransform = ...;
* unboundedWordsCount.apply(
* "writeStream",
* HadoopFormatIO.<Text, LongWritable>write()
* .withConfigurationTransform(configTransform)
* .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
* }
* }</pre>
public class HadoopFormatIO {
private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class);
/** {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}. */
/** {@link MRJobConfig#OUTPUT_KEY_CLASS}. */
public static final String OUTPUT_KEY_CLASS = MRJobConfig.OUTPUT_KEY_CLASS;
/** {@link MRJobConfig#OUTPUT_VALUE_CLASS}. */
public static final String OUTPUT_VALUE_CLASS = MRJobConfig.OUTPUT_VALUE_CLASS;
/** {@link MRJobConfig#NUM_REDUCES}. */
public static final String NUM_REDUCES = MRJobConfig.NUM_REDUCES;
/** {@link MRJobConfig#PARTITIONER_CLASS_ATTR}. */
/** {@link MRJobConfig#ID}. */
public static final String JOB_ID = MRJobConfig.ID;
/** {@link MRJobConfig#MAPREDUCE_JOB_DIR}. */
public static final String OUTPUT_DIR = FileOutputFormat.OUTDIR;
* Creates an uninitialized {@link HadoopFormatIO.Read}. Before use, the {@code Read} must be
* initialized with a HadoopFormatIO.Read#withConfiguration(HadoopConfiguration) that specifies
* the source. A key/value translation may also optionally be specified using {@link
* HadoopFormatIO.Read#withKeyTranslation}/ {@link HadoopFormatIO.Read#withValueTranslation}.
public static <K, V> Read<K, V> read() {
return new AutoValue_HadoopFormatIO_Read.Builder<K, V>().build();
* Creates an {@link Write.Builder} for creation of Write Transformation. Before creation of the
* transformation, chain of builders must be set.
* @param <KeyT> Type of keys to be written.
* @param <ValueT> Type of values to be written.
* @return Write builder
public static <KeyT, ValueT> Write.WriteBuilder<KeyT, ValueT> write() {
return new Write.Builder<>();
* A {@link PTransform} that reads from any data source which implements Hadoop InputFormat. For
* e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the class-level Javadoc on
* {@link HadoopFormatIO} for more information.
* @param <K> Type of keys to be read.
* @param <V> Type of values to be read.
* @see HadoopFormatIO
public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
// Returns the Hadoop Configuration which contains specification of source.
public abstract SerializableConfiguration getConfiguration();
public abstract SimpleFunction<?, K> getKeyTranslationFunction();
public abstract SimpleFunction<?, V> getValueTranslationFunction();
public abstract TypeDescriptor<K> getKeyTypeDescriptor();
public abstract TypeDescriptor<V> getValueTypeDescriptor();
public abstract TypeDescriptor<?> getinputFormatClass();
public abstract TypeDescriptor<?> getinputFormatKeyClass();
public abstract TypeDescriptor<?> getinputFormatValueClass();
public abstract Builder<K, V> toBuilder();
abstract static class Builder<K, V> {
abstract Builder<K, V> setConfiguration(SerializableConfiguration configuration);
abstract Builder<K, V> setKeyTranslationFunction(SimpleFunction<?, K> function);
abstract Builder<K, V> setValueTranslationFunction(SimpleFunction<?, V> function);
abstract Builder<K, V> setKeyTypeDescriptor(TypeDescriptor<K> keyTypeDescriptor);
abstract Builder<K, V> setValueTypeDescriptor(TypeDescriptor<V> valueTypeDescriptor);
abstract Builder<K, V> setInputFormatClass(TypeDescriptor<?> inputFormatClass);
abstract Builder<K, V> setInputFormatKeyClass(TypeDescriptor<?> inputFormatKeyClass);
abstract Builder<K, V> setInputFormatValueClass(TypeDescriptor<?> inputFormatValueClass);
abstract Read<K, V> build();
/** Reads from the source using the options provided by the given configuration. */
public Read<K, V> withConfiguration(Configuration configuration) {
TypeDescriptor<?> inputFormatClass =
TypeDescriptor.of(configuration.getClass("mapreduce.job.inputformat.class", null));
TypeDescriptor<?> inputFormatKeyClass =
TypeDescriptor.of(configuration.getClass("key.class", null));
TypeDescriptor<?> inputFormatValueClass =
TypeDescriptor.of(configuration.getClass("value.class", null));
Builder<K, V> builder =
toBuilder().setConfiguration(new SerializableConfiguration(configuration));
* Sets the output key class to InputFormat key class if withKeyTranslation() is not called
* yet.
if (getKeyTranslationFunction() == null) {
builder.setKeyTypeDescriptor((TypeDescriptor<K>) inputFormatKeyClass);
* Sets the output value class to InputFormat value class if withValueTranslation() is not
* called yet.
if (getValueTranslationFunction() == null) {
builder.setValueTypeDescriptor((TypeDescriptor<V>) inputFormatValueClass);
/** Transforms the keys read from the source using the given key translation function. */
public Read<K, V> withKeyTranslation(SimpleFunction<?, K> function) {
checkArgument(function != null, "function can not be null");
// Sets key class to key translation function's output class type.
return toBuilder()
/** Transforms the values read from the source using the given value translation function. */
public Read<K, V> withValueTranslation(SimpleFunction<?, V> function) {
checkArgument(function != null, "function can not be null");
// Sets value class to value translation function's output class type.
return toBuilder()
public PCollection<KV<K, V>> expand(PBegin input) {
// Get the key and value coders based on the key and value classes.
CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry);
Coder<V> valueCoder = getDefaultCoder(getValueTypeDescriptor(), coderRegistry);
HadoopInputFormatBoundedSource<K, V> source =
new HadoopInputFormatBoundedSource<>(
return input.getPipeline().apply(;
* Validates that the mandatory configuration properties such as InputFormat class, InputFormat
* key and value classes are provided in the Hadoop configuration. In case of using {@code
* DBInputFormat} you need to order results by one or more keys. It can be done by setting
* configuration option "mapreduce.jdbc.input.orderby".
private void validateConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
configuration.get("mapreduce.job.inputformat.class") != null,
"Configuration must contain \"mapreduce.job.inputformat.class\"");
configuration.get("key.class") != null, "configuration must contain \"key.class\"");
configuration.get("value.class") != null, "configuration must contain \"value.class\"");
if (configuration.get("mapreduce.job.inputformat.class").endsWith("DBInputFormat")) {
configuration.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY) != null,
"Configuration must contain \""
+ "\" when using DBInputFormat");
/** Validates construction of this transform. */
// @VisibleForTesting
public void validateTransform() {
checkArgument(getConfiguration() != null, "withConfiguration() is required");
// Validate that the key translation input type must be same as key class of InputFormat.
"Key translation's input type is not same as hadoop InputFormat : %s key class : %s");
// Validate that the value translation input type must be same as value class of InputFormat.
"Value translation's input type is not same as hadoop InputFormat : "
+ "%s value class : %s");
/** Validates translation function given for key/value translation. */
private void validateTranslationFunction(
TypeDescriptor<?> inputType, SimpleFunction<?, ?> simpleFunction, String errorMsg) {
if (simpleFunction != null && !simpleFunction.getInputTypeDescriptor().equals(inputType)) {
throw new IllegalArgumentException(
String.format(errorMsg, getinputFormatClass().getRawType(), inputType.getRawType()));
* Returns the default coder for a given type descriptor. Coder Registry is queried for correct
* coder, if not found in Coder Registry, then check if the type descriptor provided is of type
* Writable, then WritableCoder is returned, else exception is thrown "Cannot find coder".
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <T> Coder<T> getDefaultCoder(TypeDescriptor<?> typeDesc, CoderRegistry coderRegistry) {
Class classType = typeDesc.getRawType();
try {
return (Coder<T>) coderRegistry.getCoder(typeDesc);
} catch (CannotProvideCoderException e) {
if (Writable.class.isAssignableFrom(classType)) {
return (Coder<T>) WritableCoder.of(classType);
throw new IllegalStateException(
String.format("Cannot find coder for %s : ", typeDesc) + e.getMessage(), e);
* Bounded source implementation for {@link HadoopFormatIO}.
* @param <K> Type of keys to be read.
* @param <V> Type of values to be read.
public static class HadoopInputFormatBoundedSource<K, V> extends BoundedSource<KV<K, V>>
implements Serializable {
private final SerializableConfiguration conf;
private final Coder<K> keyCoder;
private final Coder<V> valueCoder;
@Nullable private final SimpleFunction<?, K> keyTranslationFunction;
@Nullable private final SimpleFunction<?, V> valueTranslationFunction;
private final SerializableSplit inputSplit;
private transient List<SerializableSplit> inputSplits;
private long boundedSourceEstimatedSize = 0;
private transient InputFormat<?, ?> inputFormatObj;
private transient TaskAttemptContext taskAttemptContext;
private static final Set<Class<?>> immutableTypes =
new HashSet<>(
SerializableConfiguration conf,
Coder<K> keyCoder,
Coder<V> valueCoder,
@Nullable SimpleFunction<?, K> keyTranslationFunction,
@Nullable SimpleFunction<?, V> valueTranslationFunction) {
this(conf, keyCoder, valueCoder, keyTranslationFunction, valueTranslationFunction, null);
protected HadoopInputFormatBoundedSource(
SerializableConfiguration conf,
Coder<K> keyCoder,
Coder<V> valueCoder,
@Nullable SimpleFunction<?, K> keyTranslationFunction,
@Nullable SimpleFunction<?, V> valueTranslationFunction,
SerializableSplit inputSplit) {
this.conf = conf;
this.inputSplit = inputSplit;
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
this.keyTranslationFunction = keyTranslationFunction;
this.valueTranslationFunction = valueTranslationFunction;
public SerializableConfiguration getConfiguration() {
return conf;
public void validate() {
checkArgument(conf != null, "conf can not be null");
checkArgument(keyCoder != null, "keyCoder can not be null");
checkArgument(valueCoder != null, "valueCoder can not be null");
public void populateDisplayData(DisplayData.Builder builder) {
Configuration hadoopConfig = getConfiguration().get();
if (hadoopConfig != null) {
.withLabel("InputFormat Class"));
DisplayData.item("key.class", hadoopConfig.get("key.class")).withLabel("Key Class"));
DisplayData.item("value.class", hadoopConfig.get("value.class"))
.withLabel("Value Class"));
public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, PipelineOptions options)
throws Exception {
// desiredBundleSizeBytes is not being considered as splitting based on this
// value is not supported by inputFormat getSplits() method.
if (inputSplit != null) {"Not splitting source {} because source is already split.", this);
return ImmutableList.of(this);
"Generated {} splits. Size of first split is {} ",
serializableInputSplit ->
new HadoopInputFormatBoundedSource<>(
public long getEstimatedSizeBytes(PipelineOptions po) throws Exception {
if (inputSplit == null) {
// If there are no splits computed yet, then retrieve the splits.
return boundedSourceEstimatedSize;
return inputSplit.getSplit().getLength();
* This is a helper function to compute splits. This method will also calculate size of the data
* being read. Note: This method is executed exactly once and the splits are retrieved and
* cached in this. These splits are further used by split() and getEstimatedSizeBytes().
void computeSplitsIfNecessary() throws IOException, InterruptedException {
if (inputSplits != null) {
List<InputSplit> splits = inputFormatObj.getSplits(Job.getInstance(conf.get()));
if (splits == null) {
throw new IOException("Error in computing splits, getSplits() returns null.");
if (splits.isEmpty()) {
throw new IOException("Error in computing splits, getSplits() returns a empty list");
boundedSourceEstimatedSize = 0;
inputSplits = new ArrayList<>();
for (InputSplit inputSplit : splits) {
if (inputSplit == null) {
throw new IOException(
"Error in computing splits, split is null in InputSplits list "
+ "populated by getSplits() : ");
boundedSourceEstimatedSize += inputSplit.getLength();
inputSplits.add(new SerializableSplit(inputSplit));
* Creates instance of InputFormat class. The InputFormat class name is specified in the Hadoop
* configuration.
protected void createInputFormatInstance() throws IOException {
if (inputFormatObj == null) {
try {
taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
inputFormatObj =
(InputFormat<?, ?>)
* If InputFormat explicitly implements interface {@link Configurable}, then setConf()
* method of {@link Configurable} needs to be explicitly called to set all the
* configuration parameters. For example: InputFormat classes which implement Configurable
* are {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat DBInputFormat}, {@link
* org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
((Configurable) inputFormatObj).setConf(conf.get());
} catch (InstantiationException
| IllegalAccessException
| ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException e) {
throw new IOException("Unable to create InputFormat object: ", e);
InputFormat<?, ?> getInputFormat() {
return inputFormatObj;
void setInputFormatObj(InputFormat<?, ?> inputFormatObj) {
this.inputFormatObj = inputFormatObj;
public Coder<KV<K, V>> getOutputCoder() {
return KvCoder.of(keyCoder, valueCoder);
public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException {
if (inputSplit == null) {
throw new IOException("Cannot create reader as source is not split yet.");
} else {
return new HadoopInputFormatReader<>(
* BoundedReader for Hadoop InputFormat source.
* @param <T1> Type of keys RecordReader emits.
* @param <T2> Type of values RecordReader emits.
class HadoopInputFormatReader<T1, T2> extends BoundedSource.BoundedReader<KV<K, V>> {
private final HadoopInputFormatBoundedSource<K, V> source;
@Nullable private final SimpleFunction<T1, K> keyTranslationFunction;
@Nullable private final SimpleFunction<T2, V> valueTranslationFunction;
private final SerializableSplit split;
private RecordReader<T1, T2> recordReader;
private volatile boolean doneReading = false;
private final AtomicLong recordsReturned = new AtomicLong();
// Tracks the progress of the RecordReader.
private final AtomicDouble progressValue = new AtomicDouble();
private final transient InputFormat<T1, T2> inputFormatObj;
private final transient TaskAttemptContext taskAttemptContext;
private HadoopInputFormatReader(
HadoopInputFormatBoundedSource<K, V> source,
@Nullable SimpleFunction keyTranslationFunction,
@Nullable SimpleFunction valueTranslationFunction,
SerializableSplit split,
InputFormat inputFormatObj,
TaskAttemptContext taskAttemptContext) {
this.source = source;
this.keyTranslationFunction = keyTranslationFunction;
this.valueTranslationFunction = valueTranslationFunction;
this.split = split;
this.inputFormatObj = inputFormatObj;
this.taskAttemptContext = taskAttemptContext;
public HadoopInputFormatBoundedSource<K, V> getCurrentSource() {
return source;
public boolean start() throws IOException {
try {
recordReader = inputFormatObj.createRecordReader(split.getSplit(), taskAttemptContext);
if (recordReader != null) {
recordReader.initialize(split.getSplit(), taskAttemptContext);
if (recordReader.nextKeyValue()) {
doneReading = false;
return true;
} else {
throw new IOException(
"Null RecordReader object returned by %s", inputFormatObj.getClass()));
recordReader = null;
} catch (InterruptedException e) {
throw new IOException(
"Could not read because the thread got interrupted while "
+ "reading the records with an exception: ",
doneReading = true;
return false;
public boolean advance() throws IOException {
try {
if (recordReader.nextKeyValue()) {
return true;
doneReading = true;
} catch (InterruptedException e) {
throw new IOException("Unable to read data: ", e);
return false;
public KV<K, V> getCurrent() {
K key;
V value;
try {
// Transform key if translation function is provided.
key = transformKeyOrValue(recordReader.getCurrentKey(), keyTranslationFunction, keyCoder);
// Transform value if translation function is provided.
value =
recordReader.getCurrentValue(), valueTranslationFunction, valueCoder);
} catch (IOException | InterruptedException e) {
LOG.error("Unable to read data: ", e);
throw new IllegalStateException("Unable to read data: " + "{}", e);
return KV.of(key, value);
/** Returns the serialized output of transformed key or value object. */
private <T, T3> T3 transformKeyOrValue(
T input, @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder)
throws CoderException, ClassCastException {
T3 output;
if (null != simpleFunction) {
output = simpleFunction.apply(input);
} else {
output = (T3) input;
return cloneIfPossiblyMutable(output, coder);
* Beam expects immutable objects, but the Hadoop InputFormats tend to re-use the same object
* when returning them. Hence, mutable objects returned by Hadoop InputFormats are cloned.
private <T> T cloneIfPossiblyMutable(T input, Coder<T> coder)
throws CoderException, ClassCastException {
// If the input object is not of known immutable type, clone the object.
if (!isKnownImmutable(input)) {
input = CoderUtils.clone(coder, input);
return input;
/** Utility method to check if the passed object is of a known immutable type. */
private boolean isKnownImmutable(Object o) {
return immutableTypes.contains(o.getClass());
public void close() throws IOException {"Closing reader after reading {} records.", recordsReturned);
if (recordReader != null) {
recordReader = null;
public Double getFractionConsumed() {
if (doneReading) {
return 1.0;
} else if (recordReader == null || recordsReturned.get() == 0L) {
return 0.0;
if (progressValue.get() == 0.0) {
return null;
return progressValue.doubleValue();
/** Returns RecordReader's progress. */
private Double getProgress() throws IOException, InterruptedException {
try {
float progress = recordReader.getProgress();
return (double) progress < 0 || progress > 1 ? 0.0 : progress;
} catch (IOException e) {
"Error in computing the fractions consumed as RecordReader.getProgress() throws an "
+ "exception : ",
throw new IOException(
"Error in computing the fractions consumed as RecordReader.getProgress() throws an "
+ "exception : "
+ e.getMessage(),
public final long getSplitPointsRemaining() {
if (doneReading) {
return 0;
This source does not currently support dynamic work rebalancing, so remaining parallelism
is always 1.
return 1;
* A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit} to be serialized using
* Java's standard serialization mechanisms.
public static class SerializableSplit implements Serializable {
InputSplit inputSplit;
public SerializableSplit() {}
public SerializableSplit(InputSplit split) {
split instanceof Writable, String.format("Split is not of type Writable: %s", split));
this.inputSplit = split;
public InputSplit getSplit() {
return inputSplit;
private void readObject(ObjectInputStream in) throws IOException {
ObjectWritable ow = new ObjectWritable();
ow.setConf(new Configuration(false));
this.inputSplit = (InputSplit) ow.get();
private void writeObject(ObjectOutputStream out) throws IOException {
new ObjectWritable(inputSplit).write(out);
* Generates tasks for output pairs and groups them by this key.
* <p>This transformation is used when is configured write with partitioning.
* @param <KeyT> type of key
* @param <ValueT> type of value
private static class GroupDataByPartition<KeyT, ValueT>
extends PTransform<
PCollection<KV<KeyT, ValueT>>, PCollection<KV<Integer, KV<KeyT, ValueT>>>> {
private final PCollectionView<Configuration> configView;
private GroupDataByPartition(PCollectionView<Configuration> configView) {
this.configView = configView;
public PCollection<KV<Integer, KV<KeyT, ValueT>>> expand(PCollection<KV<KeyT, ValueT>> input) {
return input
ParDo.of(new AssignTaskFn<KeyT, ValueT>(configView)).withSideInputs(configView))
TypeDescriptors.kvs(TypeDescriptors.integers(), input.getTypeDescriptor()))
.apply("GroupByTaskId", GroupByKey.create())
.apply("FlattenGroupedTasks", ParDo.of(new FlattenGroupedTasks<>()));
* Flattens grouped iterable {@link KV} pairs into triplets of TaskID/Key/Value.
* @param <KeyT> Type of keys to be written.
* @param <ValueT> Type of values to be written.
private static class FlattenGroupedTasks<KeyT, ValueT>
extends DoFn<KV<Integer, Iterable<KV<KeyT, ValueT>>>, KV<Integer, KV<KeyT, ValueT>>> {
public void processElement(
@Element KV<Integer, Iterable<KV<KeyT, ValueT>>> input,
OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> outputReceiver) {
final Integer key = input.getKey();
for (KV<KeyT, ValueT> element :
requireNonNull(input.getValue(), "Iterable can not be null.")) {
outputReceiver.output(KV.of(key, element));
* A {@link PTransform} that writes to any data sink which implements Hadoop OutputFormat. For
* e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the class-level Javadoc on
* {@link HadoopFormatIO} for more information.
* @param <KeyT> Type of keys to be written.
* @param <ValueT> Type of values to be written.
* @see HadoopFormatIO
public static class Write<KeyT, ValueT> extends PTransform<PCollection<KV<KeyT, ValueT>>, PDone> {
@Nullable private final transient Configuration configuration;
private final PTransform<
PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
private final ExternalSynchronization externalSynchronization;
private final boolean withPartitioning;
@Nullable Configuration configuration,
PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
ExternalSynchronization externalSynchronization,
boolean withPartitioning) {
this.configuration = configuration;
this.configTransform = configTransform;
this.externalSynchronization = externalSynchronization;
this.withPartitioning = withPartitioning;
* Builder for partitioning determining.
* @param <KeyT> Key type to write
* @param <ValueT> Value type to write
public interface PartitionedWriterBuilder<KeyT, ValueT> {
* Writes to the sink with partitioning by Task Id.
* <p>Following Hadoop configuration properties are required with this option:
* <ul>
* <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of
* write tasks which will be generated.
* <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used
* for distributing of records among partitions.
* </ul>
* @return WriteBuilder for write transformation
ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning();
* Writes to the sink without need to partition output into specified number of partitions.
* <p>This write operation doesn't do shuffle by the partition so it saves transfer time
* before write operation itself. As a consequence it generates random number of partitions.
* <p><b>Note:</b> Works only for {@link
* org.apache.beam.sdk.values.PCollection.IsBounded#BOUNDED} {@link PCollection} with global
* {@link WindowingStrategy}.
* @return WriteBuilder for write transformation
ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning();
* Builder for External Synchronization defining.
* @param <KeyT> Key type to write
* @param <ValueT> Value type to write
public interface ExternalSynchronizationBuilder<KeyT, ValueT> {
* Specifies class which will provide external synchronization required for hadoop write
* operation.
* @param externalSynchronization provider of external synchronization
* @return Write transformation
Write<KeyT, ValueT> withExternalSynchronization(
ExternalSynchronization externalSynchronization);
* Main builder of Write transformation.
* @param <KeyT> Key type to write
* @param <ValueT> Value type to write
public interface WriteBuilder<KeyT, ValueT> {
* Writes to the sink using the options provided by the given hadoop configuration.
* <p><b>Note:</b> Works only for {@link
* org.apache.beam.sdk.values.PCollection.IsBounded#BOUNDED} {@link PCollection} with global
* {@link WindowingStrategy}.
* @param config hadoop configuration.
* @return WriteBuilder with set configuration
* @throws NullPointerException when the configuration is null
* @see HadoopFormatIO for required hadoop {@link Configuration} properties
PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration config);
* Writes to the sink using configuration created by provided {@code
* configurationTransformation}.
* <p>This type is useful especially for processing unbounded windowed data but can be used
* also for batch processing.
* <p>Supports only {@link PCollection} with {@link DefaultTrigger}ing and without allowed
* lateness
* @param configTransform configuration transformation interface
* @return WriteBuilder with set configuration transformation
* @throws NullPointerException when {@code configurationTransformation} is {@code null}
* @see HadoopFormatIO for required hadoop {@link Configuration} properties
ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform(
PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
* Implementation of all builders.
* @param <KeyT> Key type to write
* @param <ValueT> Value type to write
static class Builder<KeyT, ValueT>
implements WriteBuilder<KeyT, ValueT>,
PartitionedWriterBuilder<KeyT, ValueT>,
ExternalSynchronizationBuilder<KeyT, ValueT> {
private Configuration configuration;
private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
private boolean isWithPartitioning;
public PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration config) {
checkNotNull(config, "Hadoop configuration cannot be null");
this.configuration = config;
return this;
public ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform(
PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
configTransform) {
checkNotNull(configTransform, "Configuration transformation cannot be null");
this.isWithPartitioning = true;
this.configTransform = configTransform;
return this;
public ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning() {
this.isWithPartitioning = true;
return this;
public ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning() {
this.isWithPartitioning = false;
return this;
public Write<KeyT, ValueT> withExternalSynchronization(
ExternalSynchronization externalSynchronization) {
checkNotNull(externalSynchronization, "External synchronization cannot be null");
return new Write<>(
public void validate(PipelineOptions pipelineOptions) {}
public void populateDisplayData(DisplayData.Builder builder) {
Configuration hadoopConfig = configuration;
if (hadoopConfig != null) {
.withLabel("OutputFormat Class"));
DisplayData.item(OUTPUT_KEY_CLASS, hadoopConfig.get(OUTPUT_KEY_CLASS))
.withLabel("OutputFormat Key Class"));
DisplayData.item(OUTPUT_VALUE_CLASS, hadoopConfig.get(OUTPUT_VALUE_CLASS))
.withLabel("OutputFormat Value Class"));
.withLabel("Partitioner Class"));
public PDone expand(PCollection<KV<KeyT, ValueT>> input) {
// streamed pipeline must have defined configuration transformation
if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
|| !input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) {
configTransform != null,
"Writing of unbounded data can be processed only with configuration transformation provider. See %s.withConfigurationTransform()",
TypeDescriptor<Configuration> configType = new TypeDescriptor<Configuration>() {};
.registerCoderForType(configType, new ConfigurationCoder());
PCollectionView<Configuration> configView = createConfigurationView(input);
return processJob(input, configView);
* Processes write job. Write job is composed from following partial steps:
* <ul>
* <li>When partitioning is enabled:
* <ul>
* <li>Assigning of the {@link TaskID} (represented as {@link Integer}) to the {@link
* KV}s in {@link AssignTaskFn}
* <li>Grouping {@link KV}s by the {@link TaskID}
* </ul>
* <li>Otherwise creation of TaskId via {@link PrepareNonPartitionedTasksFn} where locks are
* created for each task id
* <li>Writing of {@link KV} records via {@link WriteFn}
* <li>Committing of whole job via {@link CommitJobFn}
* </ul>
* @param input Collection with output data to write
* @param configView configuration view
* @return Successfully processed write
private PDone processJob(
PCollection<KV<KeyT, ValueT>> input, PCollectionView<Configuration> configView) {
TypeDescriptor<Iterable<Integer>> iterableIntType =
PCollection<KV<KeyT, ValueT>> validatedInput =
new SetupJobFn<>(
externalSynchronization, configView, input.getTypeDescriptor()))
PCollection<KV<Integer, KV<KeyT, ValueT>>> writeData =
? validatedInput.apply("GroupDataByPartition", new GroupDataByPartition<>(configView))
: validatedInput.apply(
new PrepareNonPartitionedTasksFn<KeyT, ValueT>(
configView, externalSynchronization))
PCollection<Iterable<Integer>> collectedFinishedWrites =
ParDo.of(new WriteFn<KeyT, ValueT>(configView, externalSynchronization))
Combine.globally(new IterableCombinerFn<>(TypeDescriptors.integers()))
ParDo.of(new CommitJobFn<Integer>(configView, externalSynchronization))
private void verifyInputWindowing(PCollection<KV<KeyT, ValueT>> input) {
if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
"Cannot work with %s and GLOBAL %s",
"Cannot work with %s trigger. Write works correctly only with %s",
"Write does not allow late data.");
* Creates {@link PCollectionView} with one {@link Configuration} based on the set source of the
* configuration.
* @param input input data
* @return PCollectionView with single {@link Configuration}
* @see Builder#withConfiguration(Configuration)
* @see Builder#withConfigurationTransform(PTransform)
private PCollectionView<Configuration> createConfigurationView(
PCollection<KV<KeyT, ValueT>> input) {
PCollectionView<Configuration> config;
if (configuration != null) {
config =
.apply("CreateOutputConfig", Create.<Configuration>of(configuration))
} else {
config = input.apply("TransformDataIntoConfig", configTransform);
return config;
* Represents context of one hadoop write task.
* @param <KeyT> Key type to write
* @param <ValueT> Value type to write
private static class TaskContext<KeyT, ValueT> {
private final RecordWriter<KeyT, ValueT> recordWriter;
private final OutputCommitter outputCommitter;
private final TaskAttemptContext taskAttemptContext;
TaskContext(TaskAttemptID taskAttempt, Configuration conf) {
taskAttemptContext = HadoopFormats.createTaskAttemptContext(conf, taskAttempt);
OutputFormat<KeyT, ValueT> outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
outputCommitter = initOutputCommitter(outputFormatObj, conf, taskAttemptContext);
recordWriter = initRecordWriter(outputFormatObj, taskAttemptContext);
RecordWriter<KeyT, ValueT> getRecordWriter() {
return recordWriter;
OutputCommitter getOutputCommitter() {
return outputCommitter;
TaskAttemptContext getTaskAttemptContext() {
return taskAttemptContext;
int getTaskId() {
return taskAttemptContext.getTaskAttemptID().getTaskID().getId();
String getJobId() {
return taskAttemptContext.getJobID().getJtIdentifier();
void abortTask() {
try {
} catch (IOException e) {
throw new IllegalStateException(
String.format("Unable to abort task %s of job %s", getTaskId(), getJobId()));
private RecordWriter<KeyT, ValueT> initRecordWriter(
OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext taskAttemptContext)
throws IllegalStateException {
try {
"Creating new RecordWriter for task {} of Job with id {}.",
return outputFormatObj.getRecordWriter(taskAttemptContext);
} catch (InterruptedException | IOException e) {
throw new IllegalStateException("Unable to create RecordWriter object: ", e);
private static OutputCommitter initOutputCommitter(
OutputFormat<?, ?> outputFormatObj,
Configuration conf,
TaskAttemptContext taskAttemptContext)
throws IllegalStateException {
OutputCommitter outputCommitter;
try {
outputCommitter = outputFormatObj.getOutputCommitter(taskAttemptContext);
if (outputCommitter != null) {
outputCommitter.setupJob(new JobContextImpl(conf, taskAttemptContext.getJobID()));
} catch (Exception e) {
throw new IllegalStateException("Unable to create OutputCommitter object: ", e);
return outputCommitter;
public String toString() {
return "TaskContext{"
+ "jobId="
+ getJobId()
+ ", taskId="
+ getTaskId()
+ ", attemptId="
+ taskAttemptContext.getTaskAttemptID().getId()
+ '}';
/** Coder of configuration instances. */
private static class ConfigurationCoder extends AtomicCoder<Configuration> {
public void encode(Configuration value, OutputStream outStream) throws IOException {
DataOutputStream dataOutputStream = new DataOutputStream(outStream);
public Configuration decode(InputStream inStream) throws IOException {
DataInputStream dataInputStream = new DataInputStream(inStream);
Configuration config = new Configuration(false);
return config;
* DoFn with following responsibilities:
* <ul>
* <li>Validates configuration - checks whether all required properties are set.
* <li>Validates types of input PCollection elements.
* <li>Setups start of the {@link OutputFormat} job for given window.
* </ul>
* <p>Logic of the setup job is called only for the very first element of the instance. All other
* elements are directly sent to next processing.
private static class SetupJobFn<KeyT, ValueT> extends DoFn<KV<KeyT, ValueT>, KV<KeyT, ValueT>> {
private final ExternalSynchronization externalSynchronization;
private final PCollectionView<Configuration> configView;
private final TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
private boolean isSetupJobAttempted;
ExternalSynchronization externalSynchronization,
PCollectionView<Configuration> configView,
TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor) {
this.externalSynchronization = externalSynchronization;
this.configView = configView;
this.inputTypeDescriptor = inputTypeDescriptor;
public void setup() {
isSetupJobAttempted = false;
public void processElement(
@DoFn.Element KV<KeyT, ValueT> element,
OutputReceiver<KV<KeyT, ValueT>> receiver,
BoundedWindow window,
ProcessContext c) {
if (isSetupJobAttempted) {
// setup of job was already attempted
Configuration conf = c.sideInput(configView);
// validate configuration and input
// must be done first, because in all later operations are required assumptions from
// validation
boolean isJobLockAcquired = externalSynchronization.tryAcquireJobLock(conf);
isSetupJobAttempted = true;
if (!isJobLockAcquired) {
// some parallel execution acquired task
try {
// setup job
JobID jobId = HadoopFormats.getJobId(conf);
trySetupJob(jobId, conf, window);
} catch (Exception e) {
throw new IllegalStateException(e);
* Validates that the mandatory configuration properties such as OutputFormat class,
* OutputFormat key and value classes are provided in the Hadoop configuration.
private void validateConfiguration(Configuration conf) {
checkArgument(conf != null, "Configuration can not be null");
conf.get(OUTPUT_FORMAT_CLASS_ATTR) != null,
"Configuration must contain \"" + OUTPUT_FORMAT_CLASS_ATTR + "\"");
conf.get(OUTPUT_KEY_CLASS) != null,
"Configuration must contain \"" + OUTPUT_KEY_CLASS + "\"");
conf.get(OUTPUT_VALUE_CLASS) != null,
"Configuration must contain \"" + OUTPUT_VALUE_CLASS + "\"");
checkArgument(conf.get(JOB_ID) != null, "Configuration must contain \"" + JOB_ID + "\"");
* Validates input data whether have correctly specified {@link TypeDescriptor}s of input data
* and if the {@link TypeDescriptor}s match with output types set in the hadoop {@link
* Configuration}.
* @param conf hadoop config
private void validateInputData(Configuration conf) {
TypeDescriptor<KeyT> outputFormatKeyClass =
(TypeDescriptor<KeyT>) TypeDescriptor.of(conf.getClass(OUTPUT_KEY_CLASS, null));
TypeDescriptor<ValueT> outputFormatValueClass =
(TypeDescriptor<ValueT>) TypeDescriptor.of(conf.getClass(OUTPUT_VALUE_CLASS, null));
inputTypeDescriptor != null,
"Input %s must be set!",
"%s expects %s as input type.",
TypeDescriptors.kvs(outputFormatKeyClass, outputFormatValueClass)),
"%s expects following %ss: KV(Key: %s, Value: %s) but following %ss are set: KV(Key: %s, Value: %s)",
* Setups the hadoop write job as running. There is possibility that some parallel worker
* already did setup. in this case {@link FileAlreadyExistsException} is catch.
* @param jobId jobId
* @param conf hadoop configuration
* @param window window
private void trySetupJob(JobID jobId, Configuration conf, BoundedWindow window) {
try {
TaskAttemptContext setupTaskContext = HadoopFormats.createSetupTaskContext(conf, jobId);
OutputFormat<?, ?> jobOutputFormat = HadoopFormats.createOutputFormatFromConfig(conf);
"Job with id {} successfully configured from window with max timestamp {}.",
} catch (FileAlreadyExistsException e) {"Job was already set by other worker. Skipping rest of the setup.");
} catch (Exception e) {
throw new RuntimeException("Unable to setup job.", e);
* Commits whole write job.
* @param <T> type of TaskId identifier
private static class CommitJobFn<T> extends DoFn<Iterable<T>, Void> {
private final PCollectionView<Configuration> configView;
private final ExternalSynchronization externalSynchronization;
PCollectionView<Configuration> configView,
ExternalSynchronization externalSynchronization) {
this.configView = configView;
this.externalSynchronization = externalSynchronization;
public void processElement(ProcessContext c) {
Configuration config = c.sideInput(configView);
* Commits whole write job.
* @param config hadoop config
private void cleanupJob(Configuration config) {
JobID jobID = HadoopFormats.getJobId(config);
TaskAttemptContext cleanupTaskContext = HadoopFormats.createCleanupTaskContext(config, jobID);
OutputFormat<?, ?> outputFormat = HadoopFormats.createOutputFormatFromConfig(config);
try {
OutputCommitter outputCommitter = outputFormat.getOutputCommitter(cleanupTaskContext);
} catch (Exception e) {
throw new RuntimeException("Unable to commit job.", e);
* Assigns {@link TaskID#getId()} to the given pair of key and value. {@link TaskID} is later used
* for writing the pair to hadoop file.
* @param <KeyT> Type of key
* @param <ValueT> Type of value
private static class AssignTaskFn<KeyT, ValueT>
extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
private final PCollectionView<Configuration> configView;
// Transient properties because they are used only for one bundle
/** Cache of created TaskIDs for given bundle. */
private transient Map<Integer, TaskID> partitionToTaskContext;
private transient Partitioner<KeyT, ValueT> partitioner;
private transient Integer reducersCount;
private transient JobID jobId;
* Needs configuration view of given window.
* @param configView configuration view
AssignTaskFn(PCollectionView<Configuration> configView) {
this.configView = configView;
/** Deletes cached fields used in previous bundle. */
public void startBundle() {
partitionToTaskContext = new HashMap<>();
partitioner = null;
jobId = null;
reducersCount = null;
public void processElement(
@Element KV<KeyT, ValueT> element,
OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> receiver,
ProcessContext c) {
Configuration config = c.sideInput(configView);
TaskID taskID = createTaskIDForKV(element, config);
int taskId = taskID.getId();
receiver.output(KV.of(taskId, element));
* Creates or reuses existing {@link TaskID} for given record.
* <p>The {@link TaskID} creation is based on the calculation hash function of {@code KeyT} of
* the pair via {@link Partitioner} (stored in configuration)
* @param kv keyvalue pair which should be written
* @param config hadoop configuration
* @return TaskID assigned to given record
private TaskID createTaskIDForKV(KV<KeyT, ValueT> kv, Configuration config) {
int taskContextKey =
getPartitioner(config).getPartition(kv.getKey(), kv.getValue(), getReducersCount(config));
return partitionToTaskContext.computeIfAbsent(
taskContextKey, (key) -> HadoopFormats.createTaskID(getJobId(config), key));
private JobID getJobId(Configuration config) {
if (jobId == null) {
jobId = HadoopFormats.getJobId(config);
return jobId;
private int getReducersCount(Configuration config) {
if (reducersCount == null) {
reducersCount = HadoopFormats.getReducersCount(config);
return reducersCount;
private Partitioner<KeyT, ValueT> getPartitioner(Configuration config) {
if (partitioner == null) {
partitioner = HadoopFormats.getPartitioner(config);
return partitioner;
* Writes all {@link KV}s pair for given {@link TaskID} (Task Id determines partition of writing).
* <p>For every {@link TaskID} are executed following steps:
* <ul>
* <li>Creation of {@link TaskContext} on start of bundle
* <li>Writing every single {@link KV} pair via {@link RecordWriter}.
* <li>Committing of task on bundle finish
* </ul>
* @param <KeyT> Type of key
* @param <ValueT> Type of value
private static class WriteFn<KeyT, ValueT> extends DoFn<KV<Integer, KV<KeyT, ValueT>>, Integer> {
private final PCollectionView<Configuration> configView;
private final ExternalSynchronization externalSynchronization;
// Transient property because they are used only for one bundle
/** Key by combination of Window and TaskId because different windows can use same TaskId. */
private transient Map<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>>
PCollectionView<Configuration> configView,
ExternalSynchronization externalSynchronization) {
this.configView = configView;
this.externalSynchronization = externalSynchronization;
/** Deletes cached map from previous bundle. */
public void startBundle() {
bundleTaskContextMap = new HashMap<>();
public void processElement(
@Element KV<Integer, KV<KeyT, ValueT>> element, ProcessContext c, BoundedWindow b) {
Integer taskID = element.getKey();
KV<BoundedWindow, Integer> win2TaskId = KV.of(b, taskID);
TaskContext<KeyT, ValueT> taskContext =
bundleTaskContextMap.computeIfAbsent(win2TaskId, w2t -> setupTask(w2t.getValue(), c));
write(element.getValue(), taskContext);
public void finishBundle(FinishBundleContext c) {
if (bundleTaskContextMap == null) {
for (Map.Entry<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>> entry :
bundleTaskContextMap.entrySet()) {
TaskContext<KeyT, ValueT> taskContext = entry.getValue();
try {
taskContext.getOutputCommitter().commitTask(taskContext.getTaskAttemptContext());"Write task for {} was successfully committed!", taskContext);
} catch (Exception e) {
processTaskException(taskContext, e);
BoundedWindow window = entry.getKey().getKey();
c.output(taskContext.getTaskId(), Objects.requireNonNull(window).maxTimestamp(), window);
private void processTaskException(TaskContext<KeyT, ValueT> taskContext, Exception e) {
LOG.warn("Write task for {} failed. Will abort task.", taskContext);
throw new IllegalArgumentException(e);
* Writes one {@link KV} pair for given {@link TaskID}.
* @param kv Iterable of pairs to write
* @param taskContext taskContext
private void write(KV<KeyT, ValueT> kv, TaskContext<KeyT, ValueT> taskContext) {
try {
RecordWriter<KeyT, ValueT> recordWriter = taskContext.getRecordWriter();
recordWriter.write(kv.getKey(), kv.getValue());
} catch (Exception e) {
processTaskException(taskContext, e);
* Creates {@link TaskContext} and setups write for given {@code taskId}.
* @param taskId id of the write Task
* @param c process context
* @return created TaskContext
* @throws IllegalStateException if the setup of the write task failed
private TaskContext<KeyT, ValueT> setupTask(Integer taskId, ProcessContext c)
throws IllegalStateException {
final Configuration conf = c.sideInput(configView);
TaskAttemptID taskAttemptID = externalSynchronization.acquireTaskAttemptIdLock(conf, taskId);
TaskContext<KeyT, ValueT> taskContext = new TaskContext<>(taskAttemptID, conf);
try {
} catch (Exception e) {
processTaskException(taskContext, e);
"Task with id {} of job {} was successfully setup!",
return taskContext;
* Registers task ID for each bundle without need to group data by taskId. Every bundle reserves
* its own taskId via particular implementation of {@link ExternalSynchronization} class.
* @param <KeyT> Type of keys to be written.
* @param <ValueT> Type of values to be written.
private static class PrepareNonPartitionedTasksFn<KeyT, ValueT>
extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
private transient TaskID taskId;
private final PCollectionView<Configuration> configView;
private final ExternalSynchronization externalSynchronization;
private PrepareNonPartitionedTasksFn(
PCollectionView<Configuration> configView,
ExternalSynchronization externalSynchronization) {
this.configView = configView;
this.externalSynchronization = externalSynchronization;
public void startBundle() {
taskId = null;
public void processElement(
@DoFn.Element KV<KeyT, ValueT> element,
OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> output,
ProcessContext c) {
if (taskId == null) {
Configuration conf = c.sideInput(configView);
taskId = externalSynchronization.acquireTaskIdLock(conf);
output.output(KV.of(taskId.getId(), element));