blob: 3e0061a109e8ad95664a30793fba84987a57d786 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.mapreduce.translation;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Class that translates a {@link Graphs.FusedStep} to a MapReduce job.
*/
public class JobPrototype {
public static JobPrototype create(
int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) {
return new JobPrototype(stageId, fusedStep, options);
}
private final int stageId;
private final Graphs.FusedStep fusedStep;
private final MapReducePipelineOptions options;
private final ConfigurationUtils configUtils;
private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) {
this.stageId = stageId;
this.fusedStep = checkNotNull(fusedStep, "fusedStep");
this.options = checkNotNull(options, "options");
this.configUtils = new ConfigurationUtils(options);
}
public Job build(Class<?> jarClass, Configuration initConf) throws IOException {
Job job = new Job(initConf);
final Configuration conf = job.getConfiguration();
job.setJarByClass(jarClass);
conf.set(
"io.serializations",
"org.apache.hadoop.io.serializer.WritableSerialization,"
+ "org.apache.hadoop.io.serializer.JavaSerialization");
conf.set("mapreduce.job.counters.group.name.max", "512");
Limits.init(conf);
conf.set(
FileOutputFormat.OUTDIR,
configUtils.getFileOutputDir(fusedStep.getStageId()));
// Setup BoundedSources in BeamInputFormat.
Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps());
checkState(startStep.getOperation() instanceof PartitionOperation);
PartitionOperation partitionOperation = (PartitionOperation) startStep.getOperation();
ArrayList<ReadOperation.TaggedSource> taggedSources = new ArrayList<>();
taggedSources.addAll(FluentIterable.from(partitionOperation
.getReadOperations())
.transform(new Function<ReadOperation, ReadOperation.TaggedSource>() {
@Override
public ReadOperation.TaggedSource apply(ReadOperation operation) {
return operation.getTaggedSource(conf);
}})
.toList());
conf.set(
BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
taggedSources)));
conf.set(
BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
new SerializablePipelineOptions(options))));
job.setInputFormatClass(BeamInputFormat.class);
if (fusedStep.containsGroupByKey()) {
Graphs.Step groupByKey = fusedStep.getGroupByKeyStep();
Graphs.Tag gbkOutTag = Iterables.getOnlyElement(fusedStep.getOutputTags(groupByKey));
GroupByKeyOperation operation = (GroupByKeyOperation) groupByKey.getOperation();
WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy();
KvCoder<?, ?> kvCoder = operation.getKvCoder();
String reifyStepName = groupByKey.getFullName() + "-Reify";
Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy);
Graphs.Tag reifyOutputTag = Graphs.Tag.of(
reifyStepName + ".out", new TupleTag<>(), reifyValueCoder, windowingStrategy);
Graphs.Step reifyStep = Graphs.Step.of(
reifyStepName,
new ReifyTimestampAndWindowsParDoOperation(
reifyStepName, options, operation.getWindowingStrategy(), reifyOutputTag));
Graphs.Step writeStep = Graphs.Step.of(
groupByKey.getFullName() + "-Write",
new ShuffleWriteOperation(kvCoder.getKeyCoder(), reifyValueCoder));
String gabwStepName = groupByKey.getFullName() + "-GroupAlsoByWindows";
Graphs.Step gabwStep = Graphs.Step.of(
gabwStepName,
new GroupAlsoByWindowsParDoOperation(
gabwStepName, options, windowingStrategy, kvCoder, gbkOutTag));
fusedStep.addStep(
reifyStep, fusedStep.getInputTags(groupByKey), ImmutableList.of(reifyOutputTag));
fusedStep.addStep(
writeStep, ImmutableList.of(reifyOutputTag), Collections.<Graphs.Tag>emptyList());
fusedStep.addStep(
gabwStep, Collections.<Graphs.Tag>emptyList(), ImmutableList.of(gbkOutTag));
fusedStep.removeStep(groupByKey);
// Setup BeamReducer
Graphs.Step reducerStartStep = gabwStep;
chainOperations(reducerStartStep, fusedStep, Sets.<Graphs.Step>newHashSet());
conf.set(
BeamReducer.BEAM_REDUCER_KV_CODER,
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
KvCoder.of(kvCoder.getKeyCoder(), reifyValueCoder))));
conf.set(
BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER,
Base64.encodeBase64String(
SerializableUtils.serializeToByteArray(reducerStartStep.getOperation())));
job.setReducerClass(BeamReducer.class);
}
// Setup DoFns in BeamMapper.
chainOperations(startStep, fusedStep, Sets.<Graphs.Step>newHashSet());
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(byte[].class);
conf.set(
BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
Base64.encodeBase64String(
SerializableUtils.serializeToByteArray(startStep.getOperation())));
job.setMapperClass(BeamMapper.class);
job.setOutputFormatClass(TextOutputFormat.class);
for (Graphs.Step step : fusedStep.getSteps()) {
if (step.getOperation() instanceof FileWriteOperation) {
FileWriteOperation writeOperation = (FileWriteOperation) step.getOperation();
//SequenceFileOutputFormat.setOutputPath(job, new Path("/tmp/mapreduce/"));
MultipleOutputs.addNamedOutput(
job,
writeOperation.getFileName(),
SequenceFileOutputFormat.class,
NullWritable.class, BytesWritable.class);
}
}
return job;
}
private void chainOperations(
Graphs.Step current, Graphs.FusedStep fusedStep, Set<Graphs.Step> visited) {
Operation<?> operation = current.getOperation();
List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current);
for (Graphs.Tag outTag : outputTags) {
for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
operation.attachConsumer(outTag.getTupleTag(), consumer.getOperation());
}
}
visited.add(current);
for (Graphs.Tag outTag : outputTags) {
for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
if (!visited.contains(consumer)) {
chainOperations(consumer, fusedStep, visited);
}
}
}
}
private Coder<Object> getReifyValueCoder(
Coder<?> valueCoder, WindowingStrategy<?, ?> windowingStrategy) {
// TODO: do we need full coder to encode windows.
return (Coder) WindowedValue.getFullCoder(
valueCoder, windowingStrategy.getWindowFn().windowCoder());
}
}