| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.hadoop.format; |
| |
| import static java.util.Objects.requireNonNull; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.auto.value.AutoValue; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.lang.reflect.InvocationTargetException; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.coders.AtomicCoder; |
| import org.apache.beam.sdk.coders.CannotProvideCoderException; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.CoderException; |
| import org.apache.beam.sdk.coders.CoderRegistry; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; |
| import org.apache.beam.sdk.io.hadoop.WritableCoder; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.transforms.Combine; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.GroupByKey; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.SimpleFunction; |
| import org.apache.beam.sdk.transforms.View; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AtomicDouble; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.ObjectWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapred.FileAlreadyExistsException; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.Partitioner; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.RecordWriter; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.TaskID; |
| import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.mapreduce.task.JobContextImpl; |
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A {@link HadoopFormatIO} is a Transform for reading data from any source or writing data to any |
| * sink which implements Hadoop {@link InputFormat} or {@link OutputFormat}. For example: Cassandra, |
| * Elasticsearch, HBase, Redis, Postgres etc. {@link HadoopFormatIO} has to make several performance |
| * trade-offs in connecting to {@link InputFormat} or {@link OutputFormat}, so if there is another |
| * Beam IO Transform specifically for connecting to your data source of choice, we would recommend |
| * using that one, but this IO Transform allows you to connect to many data sources/sinks that do |
| * not yet have a Beam IO Transform. |
| * |
| * <h3>Reading using Hadoop {@link HadoopFormatIO}</h3> |
| * |
| * <p>You will need to pass a Hadoop {@link Configuration} with parameters specifying how the read |
| * will occur. Many properties of the Configuration are optional, and some are required for certain |
| * {@link InputFormat} classes, but the following properties must be set for all InputFormats: |
| * |
| * <ul> |
| * <li>{@code mapreduce.job.inputformat.class}: The {@link InputFormat} class used to connect to |
| * your data source of choice. |
| * <li>{@code key.class}: The key class returned by the {@link InputFormat} in {@code |
| * mapreduce.job.inputformat.class}. |
| * <li>{@code value.class}: The value class returned by the {@link InputFormat} in {@code |
| * mapreduce.job.inputformat.class}. |
| * </ul> |
| * |
| * For example: |
| * |
| * <pre> |
| * { |
| * Configuration myHadoopConfiguration = new Configuration(false); |
| * // Set Hadoop InputFormat, key and value class in configuration |
| * myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", |
| * MyDbInputFormatClass, InputFormat.class); |
| * myHadoopConfiguration.setClass("key.class", MyDbInputFormatKeyClass, Object.class); |
| * myHadoopConfiguration.setClass("value.class", |
| * MyDbInputFormatValueClass, Object.class); |
| * } |
| * </pre> |
| * |
| * <p>You will need to check to see if the key and value classes output by the {@link InputFormat} |
| * have a Beam {@link Coder} available. If not, you can use withKeyTranslation/withValueTranslation |
| * to specify a method transforming instances of those classes into another class that is supported |
| * by a Beam {@link Coder}. These settings are optional and you don't need to specify translation |
| * for both key and value. If you specify a translation, you will need to make sure the K or V of |
| * the read transform match the output type of the translation. |
| * |
| * <p>You will need to set appropriate InputFormat key and value class (i.e. "key.class" and |
| * "value.class") in Hadoop {@link Configuration}. If you set different InputFormat key or value |
| * class than InputFormat's actual key or value class then, it may result in an error like |
| * "unexpected extra bytes after decoding" while the decoding process of key/value object happens. |
| * Hence, it is important to set appropriate InputFormat key and value class. |
| * |
| * <pre>{@code |
| * Pipeline p = ...; // Create pipeline. |
| * // Read data only with Hadoop configuration. |
| * p.apply("read", |
| * HadoopFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read() |
| * .withConfiguration(myHadoopConfiguration); |
| * } |
| * // Read data with configuration and key translation (Example scenario: Beam Coder is not |
| * available for key class hence key translation is required.). |
| * SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType = |
| * new SimpleFunction<InputFormatKeyClass, MyKeyClass>() { |
| * public MyKeyClass apply(InputFormatKeyClass input) { |
| * // ...logic to transform InputFormatKeyClass to MyKeyClass |
| * } |
| * }; |
| * </pre> |
| * |
| * <pre>{@code |
| * p.apply("read", |
| * HadoopFormatIO.<MyKeyClass, InputFormatKeyClass>read() |
| * .withConfiguration(myHadoopConfiguration) |
| * .withKeyTranslation(myOutputKeyType); |
| * }</pre> |
| * |
| * <p>// Read data with configuration and value translation (Example scenario: Beam Coder is not |
| * available for value class hence value translation is required.). |
| * |
| * <pre>{@code |
| * SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType = |
| * new SimpleFunction<InputFormatValueClass, MyValueClass>() { |
| * public MyValueClass apply(InputFormatValueClass input) { |
| * // ...logic to transform InputFormatValueClass to MyValueClass |
| * } |
| * }; |
| * }</pre> |
| * |
| * <pre>{@code |
| * p.apply("read", |
| * HadoopFormatIO.<InputFormatKeyClass, MyValueClass>read() |
| * .withConfiguration(myHadoopConfiguration) |
| * .withValueTranslation(myOutputValueType); |
| * }</pre> |
| * |
| * <p>IMPORTANT! In case of using {@code DBInputFormat} to read data from RDBMS, Beam parallelizes |
| * the process by using LIMIT and OFFSET clauses of SQL query to fetch different ranges of records |
| * (as a split) by different workers. To guarantee the same order and proper split of results you |
| * need to order them by one or more keys (either PRIMARY or UNIQUE). It can be done during |
| * configuration step, for example: |
| * |
| * <pre>{@code |
| * Configuration conf = new Configuration(); |
| * conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); |
| * conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name"); |
| * conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC"); |
| * }</pre> |
| * |
| * <h3>Writing using Hadoop {@link HadoopFormatIO}</h3> |
| * |
| * <p>You will need to pass a Hadoop {@link Configuration} with parameters specifying how the write |
| * will occur. Many properties of the Configuration are optional, and some are required for certain |
| * {@link OutputFormat} classes, but the following properties must be set for all OutputFormats: |
| * |
| * <ul> |
| * <li>{@code mapreduce.job.id}: The identifier of the write job. E.g.: end timestamp of window. |
| * <li>{@code mapreduce.job.outputformat.class}: The {@link OutputFormat} class used to connect to |
| * your data sink of choice. |
| * <li>{@code mapreduce.job.output.key.class}: The key class passed to the {@link OutputFormat} in |
| * {@code mapreduce.job.outputformat.class}. |
| * <li>{@code mapreduce.job.output.value.class}: The value class passed to the {@link |
| * OutputFormat} in {@code mapreduce.job.outputformat.class}. |
| * <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of write |
| * tasks which will be generated. This property is not required for {@link |
| * Write.PartitionedWriterBuilder#withoutPartitioning()} write. |
| * <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used for |
| * distributing of records among partitions. This property is not required for {@link |
| * Write.PartitionedWriterBuilder#withoutPartitioning()} write. |
| * </ul> |
| * |
| * <b>Note:</b> All mentioned values have appropriate constants. E.g.: {@link |
| * #OUTPUT_FORMAT_CLASS_ATTR}. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * Configuration myHadoopConfiguration = new Configuration(false); |
| * // Set Hadoop OutputFormat, key and value class in configuration |
| * myHadoopConfiguration.setClass("mapreduce.job.outputformat.class", |
| * MyDbOutputFormatClass, OutputFormat.class); |
| * myHadoopConfiguration.setClass("mapreduce.job.output.key.class", |
| * MyDbOutputFormatKeyClass, Object.class); |
| * myHadoopConfiguration.setClass("mapreduce.job.output.value.class", |
| * MyDbOutputFormatValueClass, Object.class); |
| * myHadoopConfiguration.setClass("mapreduce.job.partitioner.class", |
| * MyPartitionerClass, Object.class); |
| * myHadoopConfiguration.setInt("mapreduce.job.reduces", 2); |
| * }</pre> |
| * |
| * <p>You will need to set OutputFormat key and value class (i.e. "mapreduce.job.output.key.class" |
| * and "mapreduce.job.output.value.class") in Hadoop {@link Configuration} which are equal to {@code |
| * KeyT} and {@code ValueT}. If you set different OutputFormat key or value class than |
| * OutputFormat's actual key or value class then, it will throw {@link IllegalArgumentException} |
| * |
| * <h4>Batch writing</h4> |
| * |
| * <pre>{@code |
| * //Data which will we want to write |
| * PCollection<KV<Text, LongWritable>> boundedWordsCount = ... |
| * |
| * //Hadoop configuration for write |
| * //We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc |
| * Configuration myHadoopConfiguration = ... |
| * //path to directory with locks |
| * String locksDirPath = ...; |
| * |
| * boundedWordsCount.apply( |
| * "writeBatch", |
| * HadoopFormatIO.<Text, LongWritable>write() |
| * .withConfiguration(myHadoopConfiguration) |
| * .withPartitioning() |
| * .withExternalSynchronization(new HDFSSynchronization(locksDirPath))); |
| * }</pre> |
| * |
| * <h4>Stream writing</h4> |
| * |
| * <pre>{@code |
| * // Data which will we want to write |
| * PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...; |
| * // Transformation which transforms data of one window into one hadoop configuration |
| * PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>> |
| * configTransform = ...; |
| * |
| * unboundedWordsCount.apply( |
| * "writeStream", |
| * HadoopFormatIO.<Text, LongWritable>write() |
| * .withConfigurationTransform(configTransform) |
| * .withExternalSynchronization(new HDFSSynchronization(locksDirPath))); |
| * } |
| * }</pre> |
| */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| public class HadoopFormatIO { |
| private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class); |
| |
| /** {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}. */ |
| public static final String OUTPUT_FORMAT_CLASS_ATTR = MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR; |
| |
| /** {@link MRJobConfig#OUTPUT_KEY_CLASS}. */ |
| public static final String OUTPUT_KEY_CLASS = MRJobConfig.OUTPUT_KEY_CLASS; |
| |
| /** {@link MRJobConfig#OUTPUT_VALUE_CLASS}. */ |
| public static final String OUTPUT_VALUE_CLASS = MRJobConfig.OUTPUT_VALUE_CLASS; |
| |
| /** {@link MRJobConfig#NUM_REDUCES}. */ |
| public static final String NUM_REDUCES = MRJobConfig.NUM_REDUCES; |
| |
| /** {@link MRJobConfig#PARTITIONER_CLASS_ATTR}. */ |
| public static final String PARTITIONER_CLASS_ATTR = MRJobConfig.PARTITIONER_CLASS_ATTR; |
| |
| /** {@link MRJobConfig#ID}. */ |
| public static final String JOB_ID = MRJobConfig.ID; |
| |
| /** {@link MRJobConfig#MAPREDUCE_JOB_DIR}. */ |
| public static final String OUTPUT_DIR = FileOutputFormat.OUTDIR; |
| |
| /** |
| * Creates an uninitialized {@link HadoopFormatIO.Read}. Before use, the {@code Read} must be |
| * initialized with a HadoopFormatIO.Read#withConfiguration(HadoopConfiguration) that specifies |
| * the source. A key/value translation may also optionally be specified using {@link |
| * HadoopFormatIO.Read#withKeyTranslation}/ {@link HadoopFormatIO.Read#withValueTranslation}. |
| */ |
| public static <K, V> Read<K, V> read() { |
| return new AutoValue_HadoopFormatIO_Read.Builder<K, V>().build(); |
| } |
| |
| /** |
| * Creates an {@link Write.Builder} for creation of Write Transformation. Before creation of the |
| * transformation, chain of builders must be set. |
| * |
| * @param <KeyT> Type of keys to be written. |
| * @param <ValueT> Type of values to be written. |
| * @return Write builder |
| */ |
| public static <KeyT, ValueT> Write.WriteBuilder<KeyT, ValueT> write() { |
| return new Write.Builder<>(); |
| } |
| |
| /** |
| * A {@link PTransform} that reads from any data source which implements Hadoop InputFormat. For |
| * e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the class-level Javadoc on |
| * {@link HadoopFormatIO} for more information. |
| * |
| * @param <K> Type of keys to be read. |
| * @param <V> Type of values to be read. |
| * @see HadoopFormatIO |
| */ |
| @AutoValue |
| public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { |
| |
| // Returns the Hadoop Configuration which contains specification of source. |
| @Nullable |
| public abstract SerializableConfiguration getConfiguration(); |
| |
| @Nullable |
| public abstract SimpleFunction<?, K> getKeyTranslationFunction(); |
| |
| @Nullable |
| public abstract SimpleFunction<?, V> getValueTranslationFunction(); |
| |
| @Nullable |
| public abstract TypeDescriptor<K> getKeyTypeDescriptor(); |
| |
| @Nullable |
| public abstract TypeDescriptor<V> getValueTypeDescriptor(); |
| |
| @Nullable |
| public abstract TypeDescriptor<?> getinputFormatClass(); |
| |
| @Nullable |
| public abstract TypeDescriptor<?> getinputFormatKeyClass(); |
| |
| @Nullable |
| public abstract TypeDescriptor<?> getinputFormatValueClass(); |
| |
| public abstract Builder<K, V> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<K, V> { |
| abstract Builder<K, V> setConfiguration(SerializableConfiguration configuration); |
| |
| abstract Builder<K, V> setKeyTranslationFunction(SimpleFunction<?, K> function); |
| |
| abstract Builder<K, V> setValueTranslationFunction(SimpleFunction<?, V> function); |
| |
| abstract Builder<K, V> setKeyTypeDescriptor(TypeDescriptor<K> keyTypeDescriptor); |
| |
| abstract Builder<K, V> setValueTypeDescriptor(TypeDescriptor<V> valueTypeDescriptor); |
| |
| abstract Builder<K, V> setInputFormatClass(TypeDescriptor<?> inputFormatClass); |
| |
| abstract Builder<K, V> setInputFormatKeyClass(TypeDescriptor<?> inputFormatKeyClass); |
| |
| abstract Builder<K, V> setInputFormatValueClass(TypeDescriptor<?> inputFormatValueClass); |
| |
| abstract Read<K, V> build(); |
| } |
| |
| /** Reads from the source using the options provided by the given configuration. */ |
| @SuppressWarnings("unchecked") |
| public Read<K, V> withConfiguration(Configuration configuration) { |
| validateConfiguration(configuration); |
| TypeDescriptor<?> inputFormatClass = |
| TypeDescriptor.of(configuration.getClass("mapreduce.job.inputformat.class", null)); |
| TypeDescriptor<?> inputFormatKeyClass = |
| TypeDescriptor.of(configuration.getClass("key.class", null)); |
| TypeDescriptor<?> inputFormatValueClass = |
| TypeDescriptor.of(configuration.getClass("value.class", null)); |
| Builder<K, V> builder = |
| toBuilder().setConfiguration(new SerializableConfiguration(configuration)); |
| builder.setInputFormatClass(inputFormatClass); |
| builder.setInputFormatKeyClass(inputFormatKeyClass); |
| builder.setInputFormatValueClass(inputFormatValueClass); |
| /* |
| * Sets the output key class to InputFormat key class if withKeyTranslation() is not called |
| * yet. |
| */ |
| if (getKeyTranslationFunction() == null) { |
| builder.setKeyTypeDescriptor((TypeDescriptor<K>) inputFormatKeyClass); |
| } |
| /* |
| * Sets the output value class to InputFormat value class if withValueTranslation() is not |
| * called yet. |
| */ |
| if (getValueTranslationFunction() == null) { |
| builder.setValueTypeDescriptor((TypeDescriptor<V>) inputFormatValueClass); |
| } |
| return builder.build(); |
| } |
| |
| /** Transforms the keys read from the source using the given key translation function. */ |
| public Read<K, V> withKeyTranslation(SimpleFunction<?, K> function) { |
| checkArgument(function != null, "function can not be null"); |
| // Sets key class to key translation function's output class type. |
| return toBuilder() |
| .setKeyTranslationFunction(function) |
| .setKeyTypeDescriptor(function.getOutputTypeDescriptor()) |
| .build(); |
| } |
| |
| /** Transforms the values read from the source using the given value translation function. */ |
| public Read<K, V> withValueTranslation(SimpleFunction<?, V> function) { |
| checkArgument(function != null, "function can not be null"); |
| // Sets value class to value translation function's output class type. |
| return toBuilder() |
| .setValueTranslationFunction(function) |
| .setValueTypeDescriptor(function.getOutputTypeDescriptor()) |
| .build(); |
| } |
| |
| @Override |
| public PCollection<KV<K, V>> expand(PBegin input) { |
| validateTransform(); |
| // Get the key and value coders based on the key and value classes. |
| CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); |
| Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry); |
| Coder<V> valueCoder = getDefaultCoder(getValueTypeDescriptor(), coderRegistry); |
| HadoopInputFormatBoundedSource<K, V> source = |
| new HadoopInputFormatBoundedSource<>( |
| getConfiguration(), |
| keyCoder, |
| valueCoder, |
| getKeyTranslationFunction(), |
| getValueTranslationFunction()); |
| return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); |
| } |
| |
| /** |
| * Validates that the mandatory configuration properties such as InputFormat class, InputFormat |
| * key and value classes are provided in the Hadoop configuration. In case of using {@code |
| * DBInputFormat} you need to order results by one or more keys. It can be done by setting |
| * configuration option "mapreduce.jdbc.input.orderby". |
| */ |
| private void validateConfiguration(Configuration configuration) { |
| checkArgument(configuration != null, "configuration can not be null"); |
| checkArgument( |
| configuration.get("mapreduce.job.inputformat.class") != null, |
| "Configuration must contain \"mapreduce.job.inputformat.class\""); |
| checkArgument( |
| configuration.get("key.class") != null, "configuration must contain \"key.class\""); |
| checkArgument( |
| configuration.get("value.class") != null, "configuration must contain \"value.class\""); |
| if (configuration.get("mapreduce.job.inputformat.class").endsWith("DBInputFormat")) { |
| checkArgument( |
| configuration.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY) != null, |
| "Configuration must contain \"" |
| + DBConfiguration.INPUT_ORDER_BY_PROPERTY |
| + "\" when using DBInputFormat"); |
| } |
| } |
| |
| /** Validates construction of this transform. */ |
| // @VisibleForTesting |
| public void validateTransform() { |
| checkArgument(getConfiguration() != null, "withConfiguration() is required"); |
| // Validate that the key translation input type must be same as key class of InputFormat. |
| validateTranslationFunction( |
| getinputFormatKeyClass(), |
| getKeyTranslationFunction(), |
| "Key translation's input type is not same as hadoop InputFormat : %s key class : %s"); |
| // Validate that the value translation input type must be same as value class of InputFormat. |
| validateTranslationFunction( |
| getinputFormatValueClass(), |
| getValueTranslationFunction(), |
| "Value translation's input type is not same as hadoop InputFormat : " |
| + "%s value class : %s"); |
| } |
| |
| /** Validates translation function given for key/value translation. */ |
| private void validateTranslationFunction( |
| TypeDescriptor<?> inputType, SimpleFunction<?, ?> simpleFunction, String errorMsg) { |
| if (simpleFunction != null && !simpleFunction.getInputTypeDescriptor().equals(inputType)) { |
| throw new IllegalArgumentException( |
| String.format(errorMsg, getinputFormatClass().getRawType(), inputType.getRawType())); |
| } |
| } |
| |
| /** |
| * Returns the default coder for a given type descriptor. Coder Registry is queried for correct |
| * coder, if not found in Coder Registry, then check if the type descriptor provided is of type |
| * Writable, then WritableCoder is returned, else exception is thrown "Cannot find coder". |
| */ |
| @SuppressWarnings({"unchecked", "WeakerAccess"}) |
| public <T> Coder<T> getDefaultCoder(TypeDescriptor<?> typeDesc, CoderRegistry coderRegistry) { |
| Class classType = typeDesc.getRawType(); |
| try { |
| return (Coder<T>) coderRegistry.getCoder(typeDesc); |
| } catch (CannotProvideCoderException e) { |
| if (Writable.class.isAssignableFrom(classType)) { |
| return (Coder<T>) WritableCoder.of(classType); |
| } |
| throw new IllegalStateException( |
| String.format("Cannot find coder for %s : ", typeDesc) + e.getMessage(), e); |
| } |
| } |
| } |
| |
| /** |
| * Bounded source implementation for {@link HadoopFormatIO}. |
| * |
| * @param <K> Type of keys to be read. |
| * @param <V> Type of values to be read. |
| */ |
| public static class HadoopInputFormatBoundedSource<K, V> extends BoundedSource<KV<K, V>> |
| implements Serializable { |
| private final SerializableConfiguration conf; |
| private final Coder<K> keyCoder; |
| private final Coder<V> valueCoder; |
| @Nullable private final SimpleFunction<?, K> keyTranslationFunction; |
| @Nullable private final SimpleFunction<?, V> valueTranslationFunction; |
| private final SerializableSplit inputSplit; |
| private transient List<SerializableSplit> inputSplits; |
| private long boundedSourceEstimatedSize = 0; |
| private transient InputFormat<?, ?> inputFormatObj; |
| private transient TaskAttemptContext taskAttemptContext; |
| private static final Set<Class<?>> immutableTypes = |
| new HashSet<>( |
| Arrays.asList( |
| String.class, |
| Byte.class, |
| Short.class, |
| Integer.class, |
| Long.class, |
| Float.class, |
| Double.class, |
| Boolean.class, |
| BigInteger.class, |
| BigDecimal.class)); |
| |
| HadoopInputFormatBoundedSource( |
| SerializableConfiguration conf, |
| Coder<K> keyCoder, |
| Coder<V> valueCoder, |
| @Nullable SimpleFunction<?, K> keyTranslationFunction, |
| @Nullable SimpleFunction<?, V> valueTranslationFunction) { |
| this(conf, keyCoder, valueCoder, keyTranslationFunction, valueTranslationFunction, null); |
| } |
| |
| @SuppressWarnings("WeakerAccess") |
| protected HadoopInputFormatBoundedSource( |
| SerializableConfiguration conf, |
| Coder<K> keyCoder, |
| Coder<V> valueCoder, |
| @Nullable SimpleFunction<?, K> keyTranslationFunction, |
| @Nullable SimpleFunction<?, V> valueTranslationFunction, |
| SerializableSplit inputSplit) { |
| this.conf = conf; |
| this.inputSplit = inputSplit; |
| this.keyCoder = keyCoder; |
| this.valueCoder = valueCoder; |
| this.keyTranslationFunction = keyTranslationFunction; |
| this.valueTranslationFunction = valueTranslationFunction; |
| } |
| |
| @SuppressWarnings("WeakerAccess") |
| public SerializableConfiguration getConfiguration() { |
| return conf; |
| } |
| |
| @Override |
| public void validate() { |
| checkArgument(conf != null, "conf can not be null"); |
| checkArgument(keyCoder != null, "keyCoder can not be null"); |
| checkArgument(valueCoder != null, "valueCoder can not be null"); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| Configuration hadoopConfig = getConfiguration().get(); |
| if (hadoopConfig != null) { |
| builder.addIfNotNull( |
| DisplayData.item( |
| "mapreduce.job.inputformat.class", |
| hadoopConfig.get("mapreduce.job.inputformat.class")) |
| .withLabel("InputFormat Class")); |
| builder.addIfNotNull( |
| DisplayData.item("key.class", hadoopConfig.get("key.class")).withLabel("Key Class")); |
| builder.addIfNotNull( |
| DisplayData.item("value.class", hadoopConfig.get("value.class")) |
| .withLabel("Value Class")); |
| } |
| } |
| |
| @Override |
| public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, PipelineOptions options) |
| throws Exception { |
| // desiredBundleSizeBytes is not being considered as splitting based on this |
| // value is not supported by inputFormat getSplits() method. |
| if (inputSplit != null) { |
| LOG.info("Not splitting source {} because source is already split.", this); |
| return ImmutableList.of(this); |
| } |
| computeSplitsIfNecessary(); |
| LOG.info( |
| "Generated {} splits. Size of first split is {} ", |
| inputSplits.size(), |
| inputSplits.get(0).getSplit().getLength()); |
| return inputSplits.stream() |
| .map( |
| serializableInputSplit -> |
| new HadoopInputFormatBoundedSource<>( |
| conf, |
| keyCoder, |
| valueCoder, |
| keyTranslationFunction, |
| valueTranslationFunction, |
| serializableInputSplit)) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public long getEstimatedSizeBytes(PipelineOptions po) throws Exception { |
| if (inputSplit == null) { |
| // If there are no splits computed yet, then retrieve the splits. |
| computeSplitsIfNecessary(); |
| return boundedSourceEstimatedSize; |
| } |
| return inputSplit.getSplit().getLength(); |
| } |
| |
| /** |
| * This is a helper function to compute splits. This method will also calculate size of the data |
| * being read. Note: This method is executed exactly once and the splits are retrieved and |
| * cached in this. These splits are further used by split() and getEstimatedSizeBytes(). |
| */ |
| @VisibleForTesting |
| void computeSplitsIfNecessary() throws IOException, InterruptedException { |
| if (inputSplits != null) { |
| return; |
| } |
| createInputFormatInstance(); |
| List<InputSplit> splits = inputFormatObj.getSplits(Job.getInstance(conf.get())); |
| if (splits == null) { |
| throw new IOException("Error in computing splits, getSplits() returns null."); |
| } |
| if (splits.isEmpty()) { |
| throw new IOException("Error in computing splits, getSplits() returns a empty list"); |
| } |
| boundedSourceEstimatedSize = 0; |
| inputSplits = new ArrayList<>(); |
| for (InputSplit inputSplit : splits) { |
| if (inputSplit == null) { |
| throw new IOException( |
| "Error in computing splits, split is null in InputSplits list " |
| + "populated by getSplits() : "); |
| } |
| boundedSourceEstimatedSize += inputSplit.getLength(); |
| inputSplits.add(new SerializableSplit(inputSplit)); |
| } |
| } |
| |
| /** |
| * Creates instance of InputFormat class. The InputFormat class name is specified in the Hadoop |
| * configuration. |
| */ |
| @SuppressWarnings("WeakerAccess") |
| protected void createInputFormatInstance() throws IOException { |
| if (inputFormatObj == null) { |
| try { |
| taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new TaskAttemptID()); |
| inputFormatObj = |
| (InputFormat<?, ?>) |
| conf.get() |
| .getClassByName(conf.get().get("mapreduce.job.inputformat.class")) |
| .getConstructor() |
| .newInstance(); |
| /* |
| * If InputFormat explicitly implements interface {@link Configurable}, then setConf() |
| * method of {@link Configurable} needs to be explicitly called to set all the |
| * configuration parameters. For example: InputFormat classes which implement Configurable |
| * are {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat DBInputFormat}, {@link |
| * org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc. |
| */ |
| if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) { |
| ((Configurable) inputFormatObj).setConf(conf.get()); |
| } |
| } catch (InstantiationException |
| | IllegalAccessException |
| | ClassNotFoundException |
| | NoSuchMethodException |
| | InvocationTargetException e) { |
| throw new IOException("Unable to create InputFormat object: ", e); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| InputFormat<?, ?> getInputFormat() { |
| return inputFormatObj; |
| } |
| |
| @VisibleForTesting |
| void setInputFormatObj(InputFormat<?, ?> inputFormatObj) { |
| this.inputFormatObj = inputFormatObj; |
| } |
| |
| @Override |
| public Coder<KV<K, V>> getOutputCoder() { |
| return KvCoder.of(keyCoder, valueCoder); |
| } |
| |
| @Override |
| public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException { |
| this.validate(); |
| if (inputSplit == null) { |
| throw new IOException("Cannot create reader as source is not split yet."); |
| } else { |
| createInputFormatInstance(); |
| return new HadoopInputFormatReader<>( |
| this, |
| keyTranslationFunction, |
| valueTranslationFunction, |
| inputSplit, |
| inputFormatObj, |
| taskAttemptContext); |
| } |
| } |
| |
| /** |
| * BoundedReader for Hadoop InputFormat source. |
| * |
| * @param <T1> Type of keys RecordReader emits. |
| * @param <T2> Type of values RecordReader emits. |
| */ |
| class HadoopInputFormatReader<T1, T2> extends BoundedSource.BoundedReader<KV<K, V>> { |
| |
| private final HadoopInputFormatBoundedSource<K, V> source; |
| @Nullable private final SimpleFunction<T1, K> keyTranslationFunction; |
| @Nullable private final SimpleFunction<T2, V> valueTranslationFunction; |
| private final SerializableSplit split; |
| private RecordReader<T1, T2> recordReader; |
| private volatile boolean doneReading = false; |
| private final AtomicLong recordsReturned = new AtomicLong(); |
| // Tracks the progress of the RecordReader. |
| private final AtomicDouble progressValue = new AtomicDouble(); |
| private final transient InputFormat<T1, T2> inputFormatObj; |
| private final transient TaskAttemptContext taskAttemptContext; |
| |
| @SuppressWarnings("unchecked") |
| private HadoopInputFormatReader( |
| HadoopInputFormatBoundedSource<K, V> source, |
| @Nullable SimpleFunction keyTranslationFunction, |
| @Nullable SimpleFunction valueTranslationFunction, |
| SerializableSplit split, |
| InputFormat inputFormatObj, |
| TaskAttemptContext taskAttemptContext) { |
| this.source = source; |
| this.keyTranslationFunction = keyTranslationFunction; |
| this.valueTranslationFunction = valueTranslationFunction; |
| this.split = split; |
| this.inputFormatObj = inputFormatObj; |
| this.taskAttemptContext = taskAttemptContext; |
| } |
| |
| @Override |
| public HadoopInputFormatBoundedSource<K, V> getCurrentSource() { |
| return source; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| try { |
| recordsReturned.set(0L); |
| recordReader = inputFormatObj.createRecordReader(split.getSplit(), taskAttemptContext); |
| if (recordReader != null) { |
| recordReader.initialize(split.getSplit(), taskAttemptContext); |
| progressValue.set(getProgress()); |
| if (recordReader.nextKeyValue()) { |
| recordsReturned.incrementAndGet(); |
| doneReading = false; |
| return true; |
| } |
| } else { |
| throw new IOException( |
| String.format( |
| "Null RecordReader object returned by %s", inputFormatObj.getClass())); |
| } |
| recordReader = null; |
| } catch (InterruptedException e) { |
| throw new IOException( |
| "Could not read because the thread got interrupted while " |
| + "reading the records with an exception: ", |
| e); |
| } |
| doneReading = true; |
| return false; |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| try { |
| progressValue.set(getProgress()); |
| if (recordReader.nextKeyValue()) { |
| recordsReturned.incrementAndGet(); |
| return true; |
| } |
| doneReading = true; |
| } catch (InterruptedException e) { |
| throw new IOException("Unable to read data: ", e); |
| } |
| return false; |
| } |
| |
| @Override |
| public KV<K, V> getCurrent() { |
| K key; |
| V value; |
| try { |
| // Transform key if translation function is provided. |
| key = transformKeyOrValue(recordReader.getCurrentKey(), keyTranslationFunction, keyCoder); |
| // Transform value if translation function is provided. |
| value = |
| transformKeyOrValue( |
| recordReader.getCurrentValue(), valueTranslationFunction, valueCoder); |
| } catch (IOException | InterruptedException e) { |
| LOG.error("Unable to read data: ", e); |
| throw new IllegalStateException("Unable to read data: " + "{}", e); |
| } |
| return KV.of(key, value); |
| } |
| |
| /** Returns the serialized output of transformed key or value object. */ |
| @SuppressWarnings("unchecked") |
| private <T, T3> T3 transformKeyOrValue( |
| T input, @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder) |
| throws CoderException, ClassCastException { |
| T3 output; |
| if (null != simpleFunction) { |
| output = simpleFunction.apply(input); |
| } else { |
| output = (T3) input; |
| } |
| return cloneIfPossiblyMutable(output, coder); |
| } |
| |
| /** |
| * Beam expects immutable objects, but the Hadoop InputFormats tend to re-use the same object |
| * when returning them. Hence, mutable objects returned by Hadoop InputFormats are cloned. |
| */ |
| private <T> T cloneIfPossiblyMutable(T input, Coder<T> coder) |
| throws CoderException, ClassCastException { |
| // If the input object is not of known immutable type, clone the object. |
| if (!isKnownImmutable(input)) { |
| input = CoderUtils.clone(coder, input); |
| } |
| return input; |
| } |
| |
| /** Utility method to check if the passed object is of a known immutable type. */ |
| private boolean isKnownImmutable(Object o) { |
| return immutableTypes.contains(o.getClass()); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| LOG.info("Closing reader after reading {} records.", recordsReturned); |
| if (recordReader != null) { |
| recordReader.close(); |
| recordReader = null; |
| } |
| } |
| |
| @Override |
| public Double getFractionConsumed() { |
| if (doneReading) { |
| return 1.0; |
| } else if (recordReader == null || recordsReturned.get() == 0L) { |
| return 0.0; |
| } |
| if (progressValue.get() == 0.0) { |
| return null; |
| } |
| return progressValue.doubleValue(); |
| } |
| |
| /** Returns RecordReader's progress. */ |
| private Double getProgress() throws IOException, InterruptedException { |
| try { |
| float progress = recordReader.getProgress(); |
| return (double) progress < 0 || progress > 1 ? 0.0 : progress; |
| } catch (IOException e) { |
| LOG.error( |
| "Error in computing the fractions consumed as RecordReader.getProgress() throws an " |
| + "exception : ", |
| e); |
| throw new IOException( |
| "Error in computing the fractions consumed as RecordReader.getProgress() throws an " |
| + "exception : " |
| + e.getMessage(), |
| e); |
| } |
| } |
| |
| @Override |
| public final long getSplitPointsRemaining() { |
| if (doneReading) { |
| return 0; |
| } |
| /* |
| This source does not currently support dynamic work rebalancing, so remaining parallelism |
| is always 1. |
| */ |
| return 1; |
| } |
| } |
| } |
| |
| /** |
| * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit} to be serialized using |
| * Java's standard serialization mechanisms. |
| */ |
| public static class SerializableSplit implements Serializable { |
| |
| InputSplit inputSplit; |
| |
| public SerializableSplit() {} |
| |
| public SerializableSplit(InputSplit split) { |
| checkArgument( |
| split instanceof Writable, String.format("Split is not of type Writable: %s", split)); |
| this.inputSplit = split; |
| } |
| |
| @SuppressWarnings("WeakerAccess") |
| public InputSplit getSplit() { |
| return inputSplit; |
| } |
| |
| private void readObject(ObjectInputStream in) throws IOException { |
| ObjectWritable ow = new ObjectWritable(); |
| ow.setConf(new Configuration(false)); |
| ow.readFields(in); |
| this.inputSplit = (InputSplit) ow.get(); |
| } |
| |
| private void writeObject(ObjectOutputStream out) throws IOException { |
| new ObjectWritable(inputSplit).write(out); |
| } |
| } |
| |
| /** |
| * Generates tasks for output pairs and groups them by this key. |
| * |
| * <p>This transformation is used when is configured write with partitioning. |
| * |
| * @param <KeyT> type of key |
| * @param <ValueT> type of value |
| */ |
| private static class GroupDataByPartition<KeyT, ValueT> |
| extends PTransform< |
| PCollection<KV<KeyT, ValueT>>, PCollection<KV<Integer, KV<KeyT, ValueT>>>> { |
| |
| private final PCollectionView<Configuration> configView; |
| |
| private GroupDataByPartition(PCollectionView<Configuration> configView) { |
| this.configView = configView; |
| } |
| |
| @Override |
| public PCollection<KV<Integer, KV<KeyT, ValueT>>> expand(PCollection<KV<KeyT, ValueT>> input) { |
| return input |
| .apply( |
| "AssignTask", |
| ParDo.of(new AssignTaskFn<KeyT, ValueT>(configView)).withSideInputs(configView)) |
| .setTypeDescriptor( |
| TypeDescriptors.kvs(TypeDescriptors.integers(), input.getTypeDescriptor())) |
| .apply("GroupByTaskId", GroupByKey.create()) |
| .apply("FlattenGroupedTasks", ParDo.of(new FlattenGroupedTasks<>())); |
| } |
| } |
| |
| /** |
| * Flattens grouped iterable {@link KV} pairs into triplets of TaskID/Key/Value. |
| * |
| * @param <KeyT> Type of keys to be written. |
| * @param <ValueT> Type of values to be written. |
| */ |
| private static class FlattenGroupedTasks<KeyT, ValueT> |
| extends DoFn<KV<Integer, Iterable<KV<KeyT, ValueT>>>, KV<Integer, KV<KeyT, ValueT>>> { |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<Integer, Iterable<KV<KeyT, ValueT>>> input, |
| OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> outputReceiver) { |
| final Integer key = input.getKey(); |
| for (KV<KeyT, ValueT> element : |
| requireNonNull(input.getValue(), "Iterable can not be null.")) { |
| outputReceiver.output(KV.of(key, element)); |
| } |
| } |
| } |
| |
| /** |
| * A {@link PTransform} that writes to any data sink which implements Hadoop OutputFormat. For |
| * e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the class-level Javadoc on |
| * {@link HadoopFormatIO} for more information. |
| * |
| * @param <KeyT> Type of keys to be written. |
| * @param <ValueT> Type of values to be written. |
| * @see HadoopFormatIO |
| */ |
| public static class Write<KeyT, ValueT> extends PTransform<PCollection<KV<KeyT, ValueT>>, PDone> { |
| |
| @Nullable private final transient Configuration configuration; |
| |
| @Nullable |
| private final PTransform< |
| PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> |
| configTransform; |
| |
| private final ExternalSynchronization externalSynchronization; |
| |
| private final boolean withPartitioning; |
| |
| Write( |
| @Nullable Configuration configuration, |
| @Nullable |
| PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> |
| configTransform, |
| ExternalSynchronization externalSynchronization, |
| boolean withPartitioning) { |
| this.configuration = configuration; |
| this.configTransform = configTransform; |
| this.externalSynchronization = externalSynchronization; |
| this.withPartitioning = withPartitioning; |
| } |
| |
| /** |
| * Builder for partitioning determining. |
| * |
| * @param <KeyT> Key type to write |
| * @param <ValueT> Value type to write |
| */ |
| public interface PartitionedWriterBuilder<KeyT, ValueT> { |
| |
| /** |
| * Writes to the sink with partitioning by Task Id. |
| * |
| * <p>Following Hadoop configuration properties are required with this option: |
| * |
| * <ul> |
| * <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of |
| * write tasks which will be generated. |
| * <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used |
| * for distributing of records among partitions. |
| * </ul> |
| * |
| * @return WriteBuilder for write transformation |
| */ |
| ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning(); |
| |
| /** |
| * Writes to the sink without need to partition output into specified number of partitions. |
| * |
| * <p>This write operation doesn't do shuffle by the partition so it saves transfer time |
| * before write operation itself. As a consequence it generates random number of partitions. |
| * |
| * <p><b>Note:</b> Works only for {@link |
| * org.apache.beam.sdk.values.PCollection.IsBounded#BOUNDED} {@link PCollection} with global |
| * {@link WindowingStrategy}. |
| * |
| * @return WriteBuilder for write transformation |
| */ |
| ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning(); |
| } |
| |
| /** |
| * Builder for External Synchronization defining. |
| * |
| * @param <KeyT> Key type to write |
| * @param <ValueT> Value type to write |
| */ |
| public interface ExternalSynchronizationBuilder<KeyT, ValueT> { |
| |
| /** |
| * Specifies class which will provide external synchronization required for hadoop write |
| * operation. |
| * |
| * @param externalSynchronization provider of external synchronization |
| * @return Write transformation |
| */ |
| Write<KeyT, ValueT> withExternalSynchronization( |
| ExternalSynchronization externalSynchronization); |
| } |
| |
| /** |
| * Main builder of Write transformation. |
| * |
| * @param <KeyT> Key type to write |
| * @param <ValueT> Value type to write |
| */ |
| public interface WriteBuilder<KeyT, ValueT> { |
| |
| /** |
| * Writes to the sink using the options provided by the given hadoop configuration. |
| * |
| * <p><b>Note:</b> Works only for {@link |
| * org.apache.beam.sdk.values.PCollection.IsBounded#BOUNDED} {@link PCollection} with global |
| * {@link WindowingStrategy}. |
| * |
| * @param config hadoop configuration. |
| * @return WriteBuilder with set configuration |
| * @throws NullPointerException when the configuration is null |
| * @see HadoopFormatIO for required hadoop {@link Configuration} properties |
| */ |
| PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration config); |
| |
| /** |
| * Writes to the sink using configuration created by provided {@code |
| * configurationTransformation}. |
| * |
| * <p>This type is useful especially for processing unbounded windowed data but can be used |
| * also for batch processing. |
| * |
| * <p>Supports only {@link PCollection} with {@link DefaultTrigger}ing and without allowed |
| * lateness |
| * |
| * @param configTransform configuration transformation interface |
| * @return WriteBuilder with set configuration transformation |
| * @throws NullPointerException when {@code configurationTransformation} is {@code null} |
| * @see HadoopFormatIO for required hadoop {@link Configuration} properties |
| */ |
| ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform( |
| PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> |
| configTransform); |
| } |
| |
| /** |
| * Implementation of all builders. |
| * |
| * @param <KeyT> Key type to write |
| * @param <ValueT> Value type to write |
| */ |
| static class Builder<KeyT, ValueT> |
| implements WriteBuilder<KeyT, ValueT>, |
| PartitionedWriterBuilder<KeyT, ValueT>, |
| ExternalSynchronizationBuilder<KeyT, ValueT> { |
| |
| private Configuration configuration; |
| private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> |
| configTransform; |
| private boolean isWithPartitioning; |
| |
| @Override |
| public PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration config) { |
| checkNotNull(config, "Hadoop configuration cannot be null"); |
| this.configuration = config; |
| return this; |
| } |
| |
| @Override |
| public ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform( |
| PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> |
| configTransform) { |
| checkNotNull(configTransform, "Configuration transformation cannot be null"); |
| this.isWithPartitioning = true; |
| this.configTransform = configTransform; |
| return this; |
| } |
| |
| @Override |
| public ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning() { |
| this.isWithPartitioning = true; |
| return this; |
| } |
| |
| @Override |
| public ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning() { |
| this.isWithPartitioning = false; |
| return this; |
| } |
| |
| @Override |
| public Write<KeyT, ValueT> withExternalSynchronization( |
| ExternalSynchronization externalSynchronization) { |
| checkNotNull(externalSynchronization, "External synchronization cannot be null"); |
| return new Write<>( |
| this.configuration, |
| this.configTransform, |
| externalSynchronization, |
| this.isWithPartitioning); |
| } |
| } |
| |
| @Override |
| public void validate(PipelineOptions pipelineOptions) {} |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| Configuration hadoopConfig = configuration; |
| if (hadoopConfig != null) { |
| builder.addIfNotNull( |
| DisplayData.item(OUTPUT_FORMAT_CLASS_ATTR, hadoopConfig.get(OUTPUT_FORMAT_CLASS_ATTR)) |
| .withLabel("OutputFormat Class")); |
| builder.addIfNotNull( |
| DisplayData.item(OUTPUT_KEY_CLASS, hadoopConfig.get(OUTPUT_KEY_CLASS)) |
| .withLabel("OutputFormat Key Class")); |
| builder.addIfNotNull( |
| DisplayData.item(OUTPUT_VALUE_CLASS, hadoopConfig.get(OUTPUT_VALUE_CLASS)) |
| .withLabel("OutputFormat Value Class")); |
| builder.addIfNotNull( |
| DisplayData.item( |
| PARTITIONER_CLASS_ATTR, |
| hadoopConfig.get( |
| PARTITIONER_CLASS_ATTR, |
| HadoopFormats.DEFAULT_PARTITIONER_CLASS_ATTR.getName())) |
| .withLabel("Partitioner Class")); |
| } |
| } |
| |
| @Override |
| public PDone expand(PCollection<KV<KeyT, ValueT>> input) { |
| |
| // streamed pipeline must have defined configuration transformation |
| if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) |
| || !input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) { |
| checkArgument( |
| configTransform != null, |
| "Writing of unbounded data can be processed only with configuration transformation provider. See %s.withConfigurationTransform()", |
| Write.class); |
| } |
| |
| verifyInputWindowing(input); |
| |
| TypeDescriptor<Configuration> configType = new TypeDescriptor<Configuration>() {}; |
| input |
| .getPipeline() |
| .getCoderRegistry() |
| .registerCoderForType(configType, new ConfigurationCoder()); |
| |
| PCollectionView<Configuration> configView = createConfigurationView(input); |
| |
| return processJob(input, configView); |
| } |
| |
| /** |
| * Processes write job. Write job is composed from following partial steps: |
| * |
| * <ul> |
| * <li>When partitioning is enabled: |
| * <ul> |
| * <li>Assigning of the {@link TaskID} (represented as {@link Integer}) to the {@link |
| * KV}s in {@link AssignTaskFn} |
| * <li>Grouping {@link KV}s by the {@link TaskID} |
| * </ul> |
| * <li>Otherwise creation of TaskId via {@link PrepareNonPartitionedTasksFn} where locks are |
| * created for each task id |
| * <li>Writing of {@link KV} records via {@link WriteFn} |
| * <li>Committing of whole job via {@link CommitJobFn} |
| * </ul> |
| * |
| * @param input Collection with output data to write |
| * @param configView configuration view |
| * @return Successfully processed write |
| */ |
| private PDone processJob( |
| PCollection<KV<KeyT, ValueT>> input, PCollectionView<Configuration> configView) { |
| |
| TypeDescriptor<Iterable<Integer>> iterableIntType = |
| TypeDescriptors.iterables(TypeDescriptors.integers()); |
| |
| PCollection<KV<KeyT, ValueT>> validatedInput = |
| input.apply( |
| ParDo.of( |
| new SetupJobFn<>( |
| externalSynchronization, configView, input.getTypeDescriptor())) |
| .withSideInputs(configView)); |
| |
| PCollection<KV<Integer, KV<KeyT, ValueT>>> writeData = |
| withPartitioning |
| ? validatedInput.apply("GroupDataByPartition", new GroupDataByPartition<>(configView)) |
| : validatedInput.apply( |
| "PrepareNonPartitionedTasks", |
| ParDo.of( |
| new PrepareNonPartitionedTasksFn<KeyT, ValueT>( |
| configView, externalSynchronization)) |
| .withSideInputs(configView)); |
| |
| PCollection<Iterable<Integer>> collectedFinishedWrites = |
| writeData |
| .apply( |
| "Write", |
| ParDo.of(new WriteFn<KeyT, ValueT>(configView, externalSynchronization)) |
| .withSideInputs(configView)) |
| .setTypeDescriptor(TypeDescriptors.integers()) |
| .apply( |
| "CollectWriteTasks", |
| Combine.globally(new IterableCombinerFn<>(TypeDescriptors.integers())) |
| .withoutDefaults()) |
| .setTypeDescriptor(iterableIntType); |
| |
| return PDone.in( |
| collectedFinishedWrites |
| .apply( |
| "CommitWriteJob", |
| ParDo.of(new CommitJobFn<Integer>(configView, externalSynchronization)) |
| .withSideInputs(configView)) |
| .getPipeline()); |
| } |
| |
| private void verifyInputWindowing(PCollection<KV<KeyT, ValueT>> input) { |
| if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { |
| checkArgument( |
| !input.getWindowingStrategy().equals(WindowingStrategy.globalDefault()), |
| "Cannot work with %s and GLOBAL %s", |
| PCollection.IsBounded.UNBOUNDED, |
| WindowingStrategy.class.getSimpleName()); |
| checkArgument( |
| input.getWindowingStrategy().getTrigger().getClass().equals(DefaultTrigger.class), |
| "Cannot work with %s trigger. Write works correctly only with %s", |
| input.getWindowingStrategy().getTrigger().getClass().getSimpleName(), |
| DefaultTrigger.class.getSimpleName()); |
| checkArgument( |
| input.getWindowingStrategy().getAllowedLateness().equals(Duration.ZERO), |
| "Write does not allow late data."); |
| } |
| } |
| |
| /** |
| * Creates {@link PCollectionView} with one {@link Configuration} based on the set source of the |
| * configuration. |
| * |
| * @param input input data |
| * @return PCollectionView with single {@link Configuration} |
| * @see Builder#withConfiguration(Configuration) |
| * @see Builder#withConfigurationTransform(PTransform) |
| */ |
| private PCollectionView<Configuration> createConfigurationView( |
| PCollection<KV<KeyT, ValueT>> input) { |
| |
| PCollectionView<Configuration> config; |
| if (configuration != null) { |
| config = |
| input |
| .getPipeline() |
| .apply("CreateOutputConfig", Create.<Configuration>of(configuration)) |
| .apply(View.<Configuration>asSingleton().withDefaultValue(configuration)); |
| } else { |
| config = input.apply("TransformDataIntoConfig", configTransform); |
| } |
| |
| return config; |
| } |
| } |
| |
| /** |
| * Represents context of one hadoop write task. |
| * |
| * @param <KeyT> Key type to write |
| * @param <ValueT> Value type to write |
| */ |
| private static class TaskContext<KeyT, ValueT> { |
| |
| private final RecordWriter<KeyT, ValueT> recordWriter; |
| private final OutputCommitter outputCommitter; |
| private final TaskAttemptContext taskAttemptContext; |
| |
| TaskContext(TaskAttemptID taskAttempt, Configuration conf) { |
| taskAttemptContext = HadoopFormats.createTaskAttemptContext(conf, taskAttempt); |
| OutputFormat<KeyT, ValueT> outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf); |
| outputCommitter = initOutputCommitter(outputFormatObj, conf, taskAttemptContext); |
| recordWriter = initRecordWriter(outputFormatObj, taskAttemptContext); |
| } |
| |
| RecordWriter<KeyT, ValueT> getRecordWriter() { |
| return recordWriter; |
| } |
| |
| OutputCommitter getOutputCommitter() { |
| return outputCommitter; |
| } |
| |
| TaskAttemptContext getTaskAttemptContext() { |
| return taskAttemptContext; |
| } |
| |
| int getTaskId() { |
| return taskAttemptContext.getTaskAttemptID().getTaskID().getId(); |
| } |
| |
| String getJobId() { |
| return taskAttemptContext.getJobID().getJtIdentifier(); |
| } |
| |
| void abortTask() { |
| try { |
| outputCommitter.abortTask(taskAttemptContext); |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| String.format("Unable to abort task %s of job %s", getTaskId(), getJobId())); |
| } |
| } |
| |
| private RecordWriter<KeyT, ValueT> initRecordWriter( |
| OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext taskAttemptContext) |
| throws IllegalStateException { |
| try { |
| LOG.info( |
| "Creating new RecordWriter for task {} of Job with id {}.", |
| taskAttemptContext.getTaskAttemptID().getTaskID().getId(), |
| taskAttemptContext.getJobID().getJtIdentifier()); |
| return outputFormatObj.getRecordWriter(taskAttemptContext); |
| } catch (InterruptedException | IOException e) { |
| throw new IllegalStateException("Unable to create RecordWriter object: ", e); |
| } |
| } |
| |
| private static OutputCommitter initOutputCommitter( |
| OutputFormat<?, ?> outputFormatObj, |
| Configuration conf, |
| TaskAttemptContext taskAttemptContext) |
| throws IllegalStateException { |
| OutputCommitter outputCommitter; |
| try { |
| outputCommitter = outputFormatObj.getOutputCommitter(taskAttemptContext); |
| if (outputCommitter != null) { |
| outputCommitter.setupJob(new JobContextImpl(conf, taskAttemptContext.getJobID())); |
| } |
| } catch (Exception e) { |
| throw new IllegalStateException("Unable to create OutputCommitter object: ", e); |
| } |
| |
| return outputCommitter; |
| } |
| |
| @Override |
| public String toString() { |
| return "TaskContext{" |
| + "jobId=" |
| + getJobId() |
| + ", taskId=" |
| + getTaskId() |
| + ", attemptId=" |
| + taskAttemptContext.getTaskAttemptID().getId() |
| + '}'; |
| } |
| } |
| |
| /** Coder of configuration instances. */ |
| private static class ConfigurationCoder extends AtomicCoder<Configuration> { |
| |
| @Override |
| public void encode(Configuration value, OutputStream outStream) throws IOException { |
| DataOutputStream dataOutputStream = new DataOutputStream(outStream); |
| value.write(dataOutputStream); |
| dataOutputStream.flush(); |
| } |
| |
| @Override |
| public Configuration decode(InputStream inStream) throws IOException { |
| DataInputStream dataInputStream = new DataInputStream(inStream); |
| Configuration config = new Configuration(false); |
| config.readFields(dataInputStream); |
| |
| return config; |
| } |
| } |
| |
| /** |
| * DoFn with following responsibilities: |
| * |
| * <ul> |
| * <li>Validates configuration - checks whether all required properties are set. |
| * <li>Validates types of input PCollection elements. |
| * <li>Setups start of the {@link OutputFormat} job for given window. |
| * </ul> |
| * |
| * <p>Logic of the setup job is called only for the very first element of the instance. All other |
| * elements are directly sent to next processing. |
| */ |
| private static class SetupJobFn<KeyT, ValueT> extends DoFn<KV<KeyT, ValueT>, KV<KeyT, ValueT>> { |
| |
| private final ExternalSynchronization externalSynchronization; |
| private final PCollectionView<Configuration> configView; |
| private final TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor; |
| private boolean isSetupJobAttempted; |
| |
| SetupJobFn( |
| ExternalSynchronization externalSynchronization, |
| PCollectionView<Configuration> configView, |
| TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor) { |
| this.externalSynchronization = externalSynchronization; |
| this.configView = configView; |
| this.inputTypeDescriptor = inputTypeDescriptor; |
| } |
| |
| @Setup |
| public void setup() { |
| isSetupJobAttempted = false; |
| } |
| |
| @DoFn.ProcessElement |
| public void processElement( |
| @DoFn.Element KV<KeyT, ValueT> element, |
| OutputReceiver<KV<KeyT, ValueT>> receiver, |
| BoundedWindow window, |
| ProcessContext c) { |
| |
| receiver.output(element); |
| |
| if (isSetupJobAttempted) { |
| // setup of job was already attempted |
| return; |
| } |
| |
| Configuration conf = c.sideInput(configView); |
| |
| // validate configuration and input |
| // must be done first, because in all later operations are required assumptions from |
| // validation |
| validateConfiguration(conf); |
| validateInputData(conf); |
| |
| boolean isJobLockAcquired = externalSynchronization.tryAcquireJobLock(conf); |
| isSetupJobAttempted = true; |
| |
| if (!isJobLockAcquired) { |
| // some parallel execution acquired task |
| return; |
| } |
| |
| try { |
| // setup job |
| JobID jobId = HadoopFormats.getJobId(conf); |
| |
| trySetupJob(jobId, conf, window); |
| |
| } catch (Exception e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| /** |
| * Validates that the mandatory configuration properties such as OutputFormat class, |
| * OutputFormat key and value classes are provided in the Hadoop configuration. |
| */ |
| private void validateConfiguration(Configuration conf) { |
| |
| checkArgument(conf != null, "Configuration can not be null"); |
| checkArgument( |
| conf.get(OUTPUT_FORMAT_CLASS_ATTR) != null, |
| "Configuration must contain \"" + OUTPUT_FORMAT_CLASS_ATTR + "\""); |
| checkArgument( |
| conf.get(OUTPUT_KEY_CLASS) != null, |
| "Configuration must contain \"" + OUTPUT_KEY_CLASS + "\""); |
| checkArgument( |
| conf.get(OUTPUT_VALUE_CLASS) != null, |
| "Configuration must contain \"" + OUTPUT_VALUE_CLASS + "\""); |
| checkArgument(conf.get(JOB_ID) != null, "Configuration must contain \"" + JOB_ID + "\""); |
| } |
| |
| /** |
| * Validates input data whether have correctly specified {@link TypeDescriptor}s of input data |
| * and if the {@link TypeDescriptor}s match with output types set in the hadoop {@link |
| * Configuration}. |
| * |
| * @param conf hadoop config |
| */ |
| @SuppressWarnings("unchecked") |
| private void validateInputData(Configuration conf) { |
| TypeDescriptor<KeyT> outputFormatKeyClass = |
| (TypeDescriptor<KeyT>) TypeDescriptor.of(conf.getClass(OUTPUT_KEY_CLASS, null)); |
| TypeDescriptor<ValueT> outputFormatValueClass = |
| (TypeDescriptor<ValueT>) TypeDescriptor.of(conf.getClass(OUTPUT_VALUE_CLASS, null)); |
| |
| checkArgument( |
| inputTypeDescriptor != null, |
| "Input %s must be set!", |
| TypeDescriptor.class.getSimpleName()); |
| checkArgument( |
| KV.class.equals(inputTypeDescriptor.getRawType()), |
| "%s expects %s as input type.", |
| Write.class.getSimpleName(), |
| KV.class); |
| checkArgument( |
| inputTypeDescriptor.equals( |
| TypeDescriptors.kvs(outputFormatKeyClass, outputFormatValueClass)), |
| "%s expects following %ss: KV(Key: %s, Value: %s) but following %ss are set: KV(Key: %s, Value: %s)", |
| Write.class.getSimpleName(), |
| TypeDescriptor.class.getSimpleName(), |
| outputFormatKeyClass.getRawType(), |
| outputFormatValueClass.getRawType(), |
| TypeDescriptor.class.getSimpleName(), |
| inputTypeDescriptor.resolveType(KV.class.getTypeParameters()[0]), |
| inputTypeDescriptor.resolveType(KV.class.getTypeParameters()[1])); |
| } |
| |
| /** |
| * Setups the hadoop write job as running. There is possibility that some parallel worker |
| * already did setup. in this case {@link FileAlreadyExistsException} is catch. |
| * |
| * @param jobId jobId |
| * @param conf hadoop configuration |
| * @param window window |
| */ |
| private void trySetupJob(JobID jobId, Configuration conf, BoundedWindow window) { |
| try { |
| TaskAttemptContext setupTaskContext = HadoopFormats.createSetupTaskContext(conf, jobId); |
| OutputFormat<?, ?> jobOutputFormat = HadoopFormats.createOutputFormatFromConfig(conf); |
| |
| jobOutputFormat.checkOutputSpecs(setupTaskContext); |
| jobOutputFormat.getOutputCommitter(setupTaskContext).setupJob(setupTaskContext); |
| |
| LOG.info( |
| "Job with id {} successfully configured from window with max timestamp {}.", |
| jobId.getJtIdentifier(), |
| window.maxTimestamp()); |
| |
| } catch (FileAlreadyExistsException e) { |
| LOG.info("Job was already set by other worker. Skipping rest of the setup."); |
| } catch (Exception e) { |
| throw new RuntimeException("Unable to setup job.", e); |
| } |
| } |
| } |
| |
| /** |
| * Commits whole write job. |
| * |
| * @param <T> type of TaskId identifier |
| */ |
| private static class CommitJobFn<T> extends DoFn<Iterable<T>, Void> { |
| |
| private final PCollectionView<Configuration> configView; |
| private final ExternalSynchronization externalSynchronization; |
| |
| CommitJobFn( |
| PCollectionView<Configuration> configView, |
| ExternalSynchronization externalSynchronization) { |
| this.configView = configView; |
| this.externalSynchronization = externalSynchronization; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| |
| Configuration config = c.sideInput(configView); |
| cleanupJob(config); |
| } |
| |
| /** |
| * Commits whole write job. |
| * |
| * @param config hadoop config |
| */ |
| private void cleanupJob(Configuration config) { |
| |
| externalSynchronization.releaseJobIdLock(config); |
| |
| JobID jobID = HadoopFormats.getJobId(config); |
| TaskAttemptContext cleanupTaskContext = HadoopFormats.createCleanupTaskContext(config, jobID); |
| OutputFormat<?, ?> outputFormat = HadoopFormats.createOutputFormatFromConfig(config); |
| try { |
| OutputCommitter outputCommitter = outputFormat.getOutputCommitter(cleanupTaskContext); |
| outputCommitter.commitJob(cleanupTaskContext); |
| } catch (Exception e) { |
| throw new RuntimeException("Unable to commit job.", e); |
| } |
| } |
| } |
| |
| /** |
| * Assigns {@link TaskID#getId()} to the given pair of key and value. {@link TaskID} is later used |
| * for writing the pair to hadoop file. |
| * |
| * @param <KeyT> Type of key |
| * @param <ValueT> Type of value |
| */ |
| private static class AssignTaskFn<KeyT, ValueT> |
| extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> { |
| |
| private final PCollectionView<Configuration> configView; |
| |
| // Transient properties because they are used only for one bundle |
| /** Cache of created TaskIDs for given bundle. */ |
| private transient Map<Integer, TaskID> partitionToTaskContext; |
| |
| private transient Partitioner<KeyT, ValueT> partitioner; |
| private transient Integer reducersCount; |
| private transient JobID jobId; |
| |
| /** |
| * Needs configuration view of given window. |
| * |
| * @param configView configuration view |
| */ |
| AssignTaskFn(PCollectionView<Configuration> configView) { |
| this.configView = configView; |
| } |
| |
| /** Deletes cached fields used in previous bundle. */ |
| @StartBundle |
| public void startBundle() { |
| partitionToTaskContext = new HashMap<>(); |
| partitioner = null; |
| jobId = null; |
| reducersCount = null; |
| } |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<KeyT, ValueT> element, |
| OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> receiver, |
| ProcessContext c) { |
| |
| Configuration config = c.sideInput(configView); |
| |
| TaskID taskID = createTaskIDForKV(element, config); |
| int taskId = taskID.getId(); |
| receiver.output(KV.of(taskId, element)); |
| } |
| |
| /** |
| * Creates or reuses existing {@link TaskID} for given record. |
| * |
| * <p>The {@link TaskID} creation is based on the calculation hash function of {@code KeyT} of |
| * the pair via {@link Partitioner} (stored in configuration) |
| * |
| * @param kv keyvalue pair which should be written |
| * @param config hadoop configuration |
| * @return TaskID assigned to given record |
| */ |
| private TaskID createTaskIDForKV(KV<KeyT, ValueT> kv, Configuration config) { |
| int taskContextKey = |
| getPartitioner(config).getPartition(kv.getKey(), kv.getValue(), getReducersCount(config)); |
| |
| return partitionToTaskContext.computeIfAbsent( |
| taskContextKey, (key) -> HadoopFormats.createTaskID(getJobId(config), key)); |
| } |
| |
| private JobID getJobId(Configuration config) { |
| if (jobId == null) { |
| jobId = HadoopFormats.getJobId(config); |
| } |
| return jobId; |
| } |
| |
| private int getReducersCount(Configuration config) { |
| if (reducersCount == null) { |
| reducersCount = HadoopFormats.getReducersCount(config); |
| } |
| return reducersCount; |
| } |
| |
| private Partitioner<KeyT, ValueT> getPartitioner(Configuration config) { |
| if (partitioner == null) { |
| partitioner = HadoopFormats.getPartitioner(config); |
| } |
| return partitioner; |
| } |
| } |
| |
| /** |
| * Writes all {@link KV}s pair for given {@link TaskID} (Task Id determines partition of writing). |
| * |
| * <p>For every {@link TaskID} are executed following steps: |
| * |
| * <ul> |
| * <li>Creation of {@link TaskContext} on start of bundle |
| * <li>Writing every single {@link KV} pair via {@link RecordWriter}. |
| * <li>Committing of task on bundle finish |
| * </ul> |
| * |
| * @param <KeyT> Type of key |
| * @param <ValueT> Type of value |
| */ |
| private static class WriteFn<KeyT, ValueT> extends DoFn<KV<Integer, KV<KeyT, ValueT>>, Integer> { |
| |
| private final PCollectionView<Configuration> configView; |
| private final ExternalSynchronization externalSynchronization; |
| |
| // Transient property because they are used only for one bundle |
| /** Key by combination of Window and TaskId because different windows can use same TaskId. */ |
| private transient Map<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>> |
| bundleTaskContextMap; |
| |
| WriteFn( |
| PCollectionView<Configuration> configView, |
| ExternalSynchronization externalSynchronization) { |
| this.configView = configView; |
| this.externalSynchronization = externalSynchronization; |
| } |
| |
| /** Deletes cached map from previous bundle. */ |
| @StartBundle |
| public void startBundle() { |
| bundleTaskContextMap = new HashMap<>(); |
| } |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<Integer, KV<KeyT, ValueT>> element, ProcessContext c, BoundedWindow b) { |
| |
| Integer taskID = element.getKey(); |
| KV<BoundedWindow, Integer> win2TaskId = KV.of(b, taskID); |
| |
| TaskContext<KeyT, ValueT> taskContext = |
| bundleTaskContextMap.computeIfAbsent(win2TaskId, w2t -> setupTask(w2t.getValue(), c)); |
| |
| write(element.getValue(), taskContext); |
| } |
| |
| @FinishBundle |
| public void finishBundle(FinishBundleContext c) { |
| if (bundleTaskContextMap == null) { |
| return; |
| } |
| |
| for (Map.Entry<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>> entry : |
| bundleTaskContextMap.entrySet()) { |
| TaskContext<KeyT, ValueT> taskContext = entry.getValue(); |
| |
| try { |
| taskContext.getRecordWriter().close(taskContext.getTaskAttemptContext()); |
| taskContext.getOutputCommitter().commitTask(taskContext.getTaskAttemptContext()); |
| |
| LOG.info("Write task for {} was successfully committed!", taskContext); |
| } catch (Exception e) { |
| processTaskException(taskContext, e); |
| } |
| |
| BoundedWindow window = entry.getKey().getKey(); |
| c.output(taskContext.getTaskId(), Objects.requireNonNull(window).maxTimestamp(), window); |
| } |
| } |
| |
| private void processTaskException(TaskContext<KeyT, ValueT> taskContext, Exception e) { |
| LOG.warn("Write task for {} failed. Will abort task.", taskContext); |
| taskContext.abortTask(); |
| throw new IllegalArgumentException(e); |
| } |
| |
| /** |
| * Writes one {@link KV} pair for given {@link TaskID}. |
| * |
| * @param kv Iterable of pairs to write |
| * @param taskContext taskContext |
| */ |
| private void write(KV<KeyT, ValueT> kv, TaskContext<KeyT, ValueT> taskContext) { |
| |
| try { |
| RecordWriter<KeyT, ValueT> recordWriter = taskContext.getRecordWriter(); |
| recordWriter.write(kv.getKey(), kv.getValue()); |
| } catch (Exception e) { |
| processTaskException(taskContext, e); |
| } |
| } |
| |
| /** |
| * Creates {@link TaskContext} and setups write for given {@code taskId}. |
| * |
| * @param taskId id of the write Task |
| * @param c process context |
| * @return created TaskContext |
| * @throws IllegalStateException if the setup of the write task failed |
| */ |
| private TaskContext<KeyT, ValueT> setupTask(Integer taskId, ProcessContext c) |
| throws IllegalStateException { |
| |
| final Configuration conf = c.sideInput(configView); |
| TaskAttemptID taskAttemptID = externalSynchronization.acquireTaskAttemptIdLock(conf, taskId); |
| |
| TaskContext<KeyT, ValueT> taskContext = new TaskContext<>(taskAttemptID, conf); |
| |
| try { |
| taskContext.getOutputCommitter().setupTask(taskContext.getTaskAttemptContext()); |
| } catch (Exception e) { |
| processTaskException(taskContext, e); |
| } |
| |
| LOG.info( |
| "Task with id {} of job {} was successfully setup!", |
| taskId, |
| HadoopFormats.getJobId(conf).getJtIdentifier()); |
| |
| return taskContext; |
| } |
| } |
| |
| /** |
| * Registers task ID for each bundle without need to group data by taskId. Every bundle reserves |
| * its own taskId via particular implementation of {@link ExternalSynchronization} class. |
| * |
| * @param <KeyT> Type of keys to be written. |
| * @param <ValueT> Type of values to be written. |
| */ |
| private static class PrepareNonPartitionedTasksFn<KeyT, ValueT> |
| extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> { |
| |
| private transient TaskID taskId; |
| |
| private final PCollectionView<Configuration> configView; |
| private final ExternalSynchronization externalSynchronization; |
| |
| private PrepareNonPartitionedTasksFn( |
| PCollectionView<Configuration> configView, |
| ExternalSynchronization externalSynchronization) { |
| this.configView = configView; |
| this.externalSynchronization = externalSynchronization; |
| } |
| |
| @DoFn.StartBundle |
| public void startBundle() { |
| taskId = null; |
| } |
| |
| @DoFn.ProcessElement |
| public void processElement( |
| @DoFn.Element KV<KeyT, ValueT> element, |
| OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> output, |
| ProcessContext c) { |
| |
| if (taskId == null) { |
| Configuration conf = c.sideInput(configView); |
| taskId = externalSynchronization.acquireTaskIdLock(conf); |
| } |
| |
| output.output(KV.of(taskId.getId(), element)); |
| } |
| } |
| } |