This closes #3705: [BEAM-165] Initial implementation of the MapReduce runner
mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed comments.
mr-runner-hack: disable unrelated modules to shorten build time during development.
mr-runner: support SourceMetrics, this fixes MetricsTest.testBoundedSourceMetrics().
mr-runner: introduces duplicateFactor in FlattenOperation, this fixes testFlattenInputMultipleCopies().
mr-runner: translate empty flatten into EmptySource, this fixes few empty FalttenTests.
mr-runner: ensure Operation only start/finish once for diamond shaped DAG, this fixes ParDoLifecycleTest.
mr-runner: Graph.getSteps() to return with topological order, this fixes few CombineTests.
mr-runner: fail early in the runner when MapReduce job fails.
mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest that uses state.
mr-runner: use the correct step name in ParDoTranslator, this fixes MetricsTest.testAttemptedCounterMetrics().
mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.
mr-runner: handle no files case in FileSideInputReader for empty views.
mr-runner: fix NPE in PipelineTest.testIdentityTransform().
mr-runner: filter out unsupported features in ValidatesRunner tests.
mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.
mr-runner: fix the bug that steps are attached multiple times in diamond shaped DAG.
[BEAM-2783] support metrics in MapReduceRunner.
mr-runner: setup file paths for read and write sides of materialization.
mr-runner: support side inputs by reading in all views contents.
mr-runner: support multiple SourceOperations by composing and partitioning.
mr-runner: support PCollections materialization with multiple MR jobs.
mr-runner: hack to get around that ViewAsXXX.expand() return wrong output PValue.
mr-runner: support graph visualization with dotfiles.
mr-runner: refactors and creates Graph data structures to handle general Beam pipelines.
mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.
mr-runner: support reduce side ParDos and WordCount.
core-java: InMemoryTimerInternals expose getTimers() for timer firings in mr-runner.
mr-runner: add BeamReducer and support GroupByKey.
mr-runner: add ParDoOperation and support ParDos chaining.
mr-runner: add JobPrototype and translate it to a MR job.
mr-runner: support BoundedSource with BeamInputFormat.
MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.
MapReduceRunner: add Graph and its visitors.
Initial commit for MapReduceRunner.
diff --git a/pom.xml b/pom.xml
index 80ab6e2..ae86a9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
<module>sdks/java/build-tools</module>
<module>sdks</module>
<module>runners</module>
- <module>examples</module>
+ <!--<module>examples</module>-->
<!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
<module>sdks/java/javadoc</module>
</modules>
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index c29ea19..c1d42d6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -23,6 +23,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Table;
import java.util.NavigableSet;
import java.util.TreeSet;
@@ -65,6 +66,10 @@
return outputWatermarkTime;
}
+ public Iterable<TimerData> getTimers(TimeDomain domain) {
+ return ImmutableList.copyOf(timersForDomain(domain));
+ }
+
/**
* Returns when the next timer in the given time domain will fire, or {@code null}
* if there are no timers scheduled in that time domain.
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
new file mode 100644
index 0000000..90d876b
--- /dev/null
+++ b/runners/map-reduce/pom.xml
@@ -0,0 +1,197 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-parent</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-runners-mapreduce</artifactId>
+
+ <name>Apache Beam :: Runners :: MapReduce</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <mapreduce.version>2.8.1</mapreduce.version>
+ </properties>
+
+ <profiles>
+ <profile>
+ <!-- This profile adds execution of ValidatesRunner integration tests
+ against a hadoop local cluster. -->
+ <id>local-validates-runner-tests</id>
+ <activation><activeByDefault>true</activeByDefault></activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>validates-runner-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <groups>
+ org.apache.beam.sdk.testing.ValidatesRunner
+ </groups>
+ <excludes>
+ <exclude>org.apache.beam.sdk.testing.PAssertTest.java</exclude>
+ </excludes>
+ <excludedGroups>
+ org.apache.beam.sdk.testing.UsesSetState,
+ org.apache.beam.sdk.testing.UsesSplittableParDo,
+ org.apache.beam.sdk.testing.UsesDistributionMetrics,
+ org.apache.beam.sdk.testing.UsesGaugeMetrics,
+ org.apache.beam.sdk.testing.UsesCommittedMetrics,
+ org.apache.beam.sdk.testing.LargeKeys$Above10MB,
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
+ org.apache.beam.sdk.testing.UsesStatefulParDo,
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
+ org.apache.beam.sdk.testing.UsesTestStream
+ </excludedGroups>
+ <parallel>none</parallel>
+ <failIfNoTests>true</failIfNoTests>
+ <dependenciesToScan>
+ <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+ </dependenciesToScan>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=org.apache.beam.runners.mapreduce.MapReduceRunner"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <!-- MapRecue dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${mapreduce.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${mapreduce.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${mapreduce.version}</version>
+ </dependency>
+
+ <!-- Beam dependencies -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
+ </dependency>
+
+ <!-- Module dependencies -->
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ </dependency>
+
+ <!-- Depend on test jar to scan for ValidatesRunner tests -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- transitive test dependencies from beam-sdk-java-core -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals><goal>analyze-only</goal></goals>
+ <configuration>
+ <!-- disable for now during runner development -->
+ <failOnWarning>false</failOnWarning>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
new file mode 100644
index 0000000..7cff40d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * {@link PipelineOptions} for {@link MapReduceRunner}.
+ */
+public interface MapReducePipelineOptions extends PipelineOptions {
+
+ /** Classes that are used as the boundary in the stack trace to find the callers class name. */
+ Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of(
+ PipelineOptionsFactory.class.getName(),
+ PipelineOptionsFactory.Builder.class.getName(),
+ "org.apache.beam.sdk.options.ProxyInvocationHandler");
+
+ @Description("The jar class of the user Beam program.")
+ @Default.InstanceFactory(JarClassInstanceFactory.class)
+ Class<?> getJarClass();
+ void setJarClass(Class<?> jarClass);
+
+ @Description("The directory for files output.")
+ @Default.String("/tmp/mapreduce/")
+ String getFileOutputDir();
+ void setFileOutputDir(String fileOutputDir);
+
+ /**
+ * Returns the {@link Class} that constructs MapReduce job through Beam.
+ */
+ class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> {
+ @Override
+ public Class<?> create(PipelineOptions options) {
+ return findCallersClassName(options);
+ }
+
+ /**
+ * Returns the simple name of the calling class using the current threads stack.
+ */
+ private static Class<?> findCallersClassName(PipelineOptions options) {
+ Iterator<StackTraceElement> elements =
+ Iterators.forArray(Thread.currentThread().getStackTrace());
+ // First find the PipelineOptionsFactory/Builder class in the stack trace.
+ while (elements.hasNext()) {
+ StackTraceElement next = elements.next();
+ if (PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())) {
+ break;
+ }
+ }
+ // Then find the first instance after that is not the PipelineOptionsFactory/Builder class.
+ while (elements.hasNext()) {
+ StackTraceElement next = elements.next();
+ if (!PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())
+ && !next.getClassName().contains("com.sun.proxy.$Proxy")
+ && !next.getClassName().equals(options.getRunner().getName())) {
+ try {
+ return Class.forName(next.getClassName());
+ } catch (ClassNotFoundException e) {
+ break;
+ }
+ }
+ }
+ return null;
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
new file mode 100644
index 0000000..933d8f6
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.mapreduce.translation.MapReduceMetricResults;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.Duration;
+
+/**
+ * A {@link PipelineResult} of executing {@link org.apache.beam.sdk.Pipeline Pipelines} using
+ * {@link MapReduceRunner}.
+ *
+ * <p>It is synchronous (returned after the pipeline is finished), and is used for querying metrics.
+ */
+public class MapReducePipelineResult implements PipelineResult {
+
+ private final List<Job> jobs;
+ public MapReducePipelineResult(List<Job> jobs) {
+ this.jobs = checkNotNull(jobs, "jobs");
+ }
+
+ @Override
+ public State getState() {
+ return State.DONE;
+ }
+
+ @Override
+ public State cancel() throws IOException {
+ return State.DONE;
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ return State.DONE;
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ return State.DONE;
+ }
+
+ @Override
+ public MetricResults metrics() {
+ return new MapReduceMetricResults(jobs);
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
new file mode 100644
index 0000000..1029218
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Registrars for {@link MapReduceRunner}.
+ */
+public class MapReduceRegistrar {
+ private MapReduceRegistrar() {
+ }
+
+ /**
+ * Registers the {@link MapReduceRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>> of(MapReduceRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link MapReducePipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>> of(MapReducePipelineOptions.class);
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
new file mode 100644
index 0000000..85b7d1b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.mapreduce.translation.DotfileWriter;
+import org.apache.beam.runners.mapreduce.translation.GraphConverter;
+import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
+import org.apache.beam.runners.mapreduce.translation.Graphs;
+import org.apache.beam.runners.mapreduce.translation.JobPrototype;
+import org.apache.beam.runners.mapreduce.translation.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.log4j.BasicConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PipelineRunner} for Hadoop MapReduce.
+ *
+ * <p>It translates a Beam {@link Pipeline} to a series of MapReduce {@link Job jobs}, and executes
+ * them locally or on a Hadoop cluster.
+ */
+public class MapReduceRunner extends PipelineRunner<PipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MapReduceRunner.class);
+
+ /**
+ * Construct a runner from the provided options.
+ *
+ * @param options Properties which configure the runner.
+ * @return The newly created runner.
+ */
+ public static MapReduceRunner fromOptions(PipelineOptions options) {
+ return new MapReduceRunner(options.as(MapReducePipelineOptions.class));
+ }
+
+ private final MapReducePipelineOptions options;
+
+ MapReduceRunner(MapReducePipelineOptions options) {
+ this.options = checkNotNull(options, "options");
+ }
+
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ BasicConfigurator.configure();
+ MetricsEnvironment.setMetricsSupported(true);
+
+ TranslationContext context = new TranslationContext(options);
+ GraphConverter graphConverter = new GraphConverter(context);
+ pipeline.traverseTopologically(graphConverter);
+
+ LOG.info(graphConverter.getDotfile());
+
+ Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph());
+ LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
+ GraphPlanner planner = new GraphPlanner(options);
+ fusedGraph = planner.plan(fusedGraph);
+
+ LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
+ fusedGraph.getFusedSteps();
+
+ List<Job> jobs = new ArrayList<>();
+ int stageId = 0;
+ for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+ Configuration config = new Configuration();
+ config.set("keep.failed.task.files", "true");
+
+ JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, options);
+ LOG.info("Running job-{}.", stageId);
+ LOG.info(DotfileWriter.toDotfile(fusedStep));
+ try {
+ Job job = jobPrototype.build(options.getJarClass(), config);
+ job.waitForCompletion(true);
+ if (!job.getStatus().getState().equals(JobStatus.State.SUCCEEDED)) {
+ throw new RuntimeException("MapReduce job failed: " + job.getJobID());
+ }
+ jobs.add(job);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ return new MapReducePipelineResult(jobs);
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
new file mode 100644
index 0000000..e452d92
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementation of the Beam runner for Apache Hadoop MapReduce.
+ */
+package org.apache.beam.runners.mapreduce;
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
new file mode 100644
index 0000000..3d0b8ea
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Adaptor from Beam {@link BoundedSource} to MapReduce {@link InputFormat}.
+ */
+public class BeamInputFormat<T> extends InputFormat {
+
+ public static final String BEAM_SERIALIZED_BOUNDED_SOURCE = "beam-serialized-bounded-source";
+ public static final String BEAM_SERIALIZED_PIPELINE_OPTIONS = "beam-serialized-pipeline-options";
+
+ private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000;
+
+ private List<ReadOperation.TaggedSource> sources;
+ private SerializedPipelineOptions options;
+
+ public BeamInputFormat() {
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ String serializedBoundedSource = context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE);
+ String serializedPipelineOptions =
+ context.getConfiguration().get(BEAM_SERIALIZED_PIPELINE_OPTIONS);
+ if (Strings.isNullOrEmpty(serializedBoundedSource)
+ || Strings.isNullOrEmpty(serializedPipelineOptions)) {
+ return ImmutableList.of();
+ }
+ sources = (List<ReadOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray(
+ Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
+ options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray(
+ Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions"));
+
+ try {
+
+ return FluentIterable.from(sources)
+ .transformAndConcat(
+ new Function<ReadOperation.TaggedSource, Iterable<ReadOperation.TaggedSource>>() {
+ @Override
+ public Iterable<ReadOperation.TaggedSource> apply(
+ final ReadOperation.TaggedSource taggedSource) {
+ try {
+ return FluentIterable.from(taggedSource.getSource().split(
+ DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions()))
+ .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() {
+ @Override
+ public ReadOperation.TaggedSource apply(BoundedSource<?> input) {
+ return ReadOperation.TaggedSource.of(
+ taggedSource.getStepName(), input, taggedSource.getTag());
+ }});
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ })
+ .transform(new Function<ReadOperation.TaggedSource, InputSplit>() {
+ @Override
+ public InputSplit apply(ReadOperation.TaggedSource taggedSource) {
+ return new BeamInputSplit(taggedSource.getStepName(), taggedSource.getSource(),
+ options, taggedSource.getTag());
+ }})
+ .toList();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public RecordReader createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ return ((BeamInputSplit) split).createReader();
+ }
+
+ private static class BeamInputSplit<T> extends InputSplit implements Writable {
+ private String stepName;
+ private BoundedSource<T> boundedSource;
+ private SerializedPipelineOptions options;
+ private TupleTag<?> tupleTag;
+
+ public BeamInputSplit() {
+ }
+
+ public BeamInputSplit(
+ String stepName,
+ BoundedSource<T> boundedSource,
+ SerializedPipelineOptions options,
+ TupleTag<?> tupleTag) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.boundedSource = checkNotNull(boundedSource, "boundedSources");
+ this.options = checkNotNull(options, "options");
+ this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+ }
+
+ public BeamRecordReader<T> createReader() throws IOException {
+ return new BeamRecordReader<>(
+ stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ try {
+ return boundedSource.getEstimatedSizeBytes(options.getPipelineOptions());
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ Throwables.throwIfInstanceOf(e, IOException.class);
+ Throwables.throwIfInstanceOf(e, InterruptedException.class);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[0];
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ StringUtf8Coder.of().encode(stepName, stream);
+ SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
+ SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream);
+ SerializableCoder.of(TupleTag.class).encode(tupleTag, stream);
+
+ byte[] bytes = stream.toByteArray();
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+
+ ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
+ stepName = StringUtf8Coder.of().decode(inStream);
+ boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream);
+ options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
+ tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream);
+ }
+ }
+
+ private static class BeamRecordReader<T> extends RecordReader {
+
+ private final String stepName;
+ private final BoundedSource.BoundedReader<T> reader;
+ private final TupleTag<?> tupleTag;
+ private MetricsReporter metricsReporter;
+ private boolean started;
+
+ public BeamRecordReader(
+ String stepName, BoundedSource.BoundedReader<T> reader, TupleTag<?> tupleTag) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.reader = checkNotNull(reader, "reader");
+ this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+ this.started = false;
+ }
+
+ @Override
+ public void initialize(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.metricsReporter = new MetricsReporter(context);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ if (!started) {
+ started = true;
+ return reader.start();
+ } else {
+ return reader.advance();
+ }
+ }
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException, InterruptedException {
+ return tupleTag;
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ // TODO: this is a hack to handle that reads from materialized PCollections
+ // already return WindowedValue.
+ if (reader.getCurrent() instanceof WindowedValue) {
+ return reader.getCurrent();
+ } else {
+ return WindowedValue.timestampedValueInGlobalWindow(
+ reader.getCurrent(), reader.getCurrentTimestamp());
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ Double progress = reader.getFractionConsumed();
+ if (progress != null) {
+ return progress.floatValue();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ metricsReporter.updateMetrics();
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
new file mode 100644
index 0000000..46c74c0
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adapter for executing {@link Operation operations} in {@link Mapper}.
+ */
+public class BeamMapper<ValueInT, ValueOutT>
+ extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(Mapper.class);
+
+ public static final String BEAM_PAR_DO_OPERATION_MAPPER = "beam-par-do-op-mapper";
+
+ private Operation operation;
+
+ @Override
+ protected void setup(
+ Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) {
+ String serializedParDo = checkNotNull(
+ context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER),
+ BEAM_PAR_DO_OPERATION_MAPPER);
+ operation = (Operation) SerializableUtils.deserializeFromByteArray(
+ Base64.decodeBase64(serializedParDo), "Operation");
+ operation.start((TaskInputOutputContext) context);
+ }
+
+ @Override
+ protected void map(
+ Object key,
+ WindowedValue<ValueInT> value,
+ Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context)
+ throws IOException, InterruptedException {
+ LOG.info("key: {} value: {}.", key, value);
+ // Only needs to pass KV to the following PartitionOperation. However, we have to wrap it in a
+ // global window because of the method signature.
+ operation.process(WindowedValue.valueInGlobalWindow(KV.of(key, value)));
+ }
+
+ @Override
+ protected void cleanup(
+ Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) {
+ operation.finish();
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
new file mode 100644
index 0000000..b69be32
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adapter for executing {@link Operation operations} in {@link Reducer}.
+ */
+public class BeamReducer<ValueInT, ValueOutT>
+ extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
+
+ public static final String BEAM_REDUCER_KV_CODER = "beam-reducer-kv-coder";
+ public static final String BEAM_PAR_DO_OPERATION_REDUCER = "beam-par-do-op-reducer";
+
+ private Coder<Object> keyCoder;
+ private Coder<Object> valueCoder;
+ private Operation operation;
+
+ @Override
+ protected void setup(
+ Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) {
+ String serializedValueCoder = checkNotNull(
+ context.getConfiguration().get(BEAM_REDUCER_KV_CODER),
+ BEAM_REDUCER_KV_CODER);
+ String serializedParDo = checkNotNull(
+ context.getConfiguration().get(BEAM_PAR_DO_OPERATION_REDUCER),
+ BEAM_PAR_DO_OPERATION_REDUCER);
+ KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) SerializableUtils
+ .deserializeFromByteArray(Base64.decodeBase64(serializedValueCoder), "Coder");
+ keyCoder = kvCoder.getKeyCoder();
+ valueCoder = kvCoder.getValueCoder();
+ operation = (Operation) SerializableUtils.deserializeFromByteArray(
+ Base64.decodeBase64(serializedParDo), "Operation");
+ operation.start((TaskInputOutputContext) context);
+ }
+
+ @Override
+ protected void reduce(
+ BytesWritable key,
+ Iterable<byte[]> values,
+ Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context)
+ throws InterruptedException, IOException {
+ List<Object> decodedValues = Lists.newArrayList(FluentIterable.from(values)
+ .transform(new Function<byte[], Object>() {
+ @Override
+ public Object apply(byte[] input) {
+ ByteArrayInputStream inStream = new ByteArrayInputStream(input);
+ try {
+ return valueCoder.decode(inStream);
+ } catch (IOException e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }}));
+ Object decodedKey = keyCoder.decode(new ByteArrayInputStream(key.getBytes()));
+ LOG.info("key: {} value: {}.", decodedKey, decodedValues);
+ // Only needs to pass KV to the following GABW operation. However, we have to wrap it in a
+ // global window because of the method signature.
+ operation.process(
+ WindowedValue.valueInGlobalWindow(KV.of(decodedKey, decodedValues)));
+ }
+
+ @Override
+ protected void cleanup(
+ Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) {
+ operation.finish();
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
new file mode 100644
index 0000000..a905d29
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Utilities to handle {@link Configuration}.
+ */
+public class ConfigurationUtils {
+
+ private final MapReducePipelineOptions options;
+
+ public ConfigurationUtils(MapReducePipelineOptions options) {
+ this.options = checkNotNull(options, "options");
+ }
+
+ public static ResourceId getResourceIdForOutput(String fileName, Configuration conf) {
+ ResourceId outDir = FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true);
+ return outDir.resolve(fileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+ }
+
+ public String getFileOutputDir(int stageId) {
+ String fileOutputDir = options.getFileOutputDir();
+ if (fileOutputDir.endsWith("/")) {
+ return String.format("%s%s/stage-%d", fileOutputDir, options.getJobName(), stageId);
+ } else {
+ return String.format("%s/%s/stage-%d", fileOutputDir, options.getJobName(), stageId);
+ }
+ }
+
+ public String getFileOutputPath(int stageId, String fileName) {
+ return String.format("%s/%s", getFileOutputDir(stageId), fileName);
+ }
+
+ public static String toFileName(String tagName) {
+ return tagName.replaceAll("[^A-Za-z0-9]", "0");
+ }
+
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
new file mode 100644
index 0000000..12cc03c
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class that outputs {@link Graph} to dot file.
+ */
+public class DotfileWriter {
+
+ public static <StepT extends Graph.AbstractStep, TagT extends Graph.AbstractTag>
+ String toDotfile(Graphs.FusedGraph fusedGraph) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\ndigraph G {\n");
+
+ Map<Graphs.FusedStep, String> fusedStepToId = Maps.newHashMap();
+ int i = 0;
+ for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+ String clusterId = String.format("cluster_%d", i++);
+ sb.append(String.format(" subgraph \"%s\" {%n", clusterId));
+ sb.append(String.format(" \"%s\" [shape=point style=invis];%n", clusterId));
+ fusedStepToId.put(fusedStep, clusterId);
+
+ Set<String> nodeDefines = Sets.newHashSet();
+ for (Graphs.Step step : fusedStep.getSteps()) {
+ nodeDefines.add(String.format(" \"%s\" [shape=box];%n", step.getFullName()));
+ for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+ nodeDefines.add(String.format(" \"%s\" [shape=ellipse];%n", inTag));
+ }
+ for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+ nodeDefines.add(String.format(" \"%s\" [shape=ellipse];%n", outTag));
+ }
+ }
+ for (String str : nodeDefines) {
+ sb.append(str);
+ }
+ sb.append(String.format(" }"));
+ }
+ for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+ // Edges within fused steps.
+ for (Graphs.Step step : fusedStep.getSteps()) {
+ for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+ sb.append(String.format(" \"%s\" -> \"%s\";%n", inTag, step));
+ }
+ for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+ sb.append(String.format(" \"%s\" -> \"%s\";%n", step, outTag));
+ }
+ }
+
+ // Edges between sub-graphs.
+ for (Graphs.Tag inTag : fusedGraph.getInputTags(fusedStep)) {
+ sb.append(String.format(" \"%s\" -> \"%s\";%n", inTag, fusedStepToId.get(fusedStep)));
+ }
+ }
+ sb.append("}\n");
+ return sb.toString();
+ }
+
+ public static String toDotfile(Graphs.FusedStep fusedStep) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\ndigraph G {\n");
+ for (Graphs.Step step : fusedStep.getSteps()) {
+ sb.append(String.format(" \"%s\" [shape=box];%n", step.getFullName()));
+ for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+ sb.append(String.format(" \"%s\" -> \"%s\";%n", inTag, step));
+ }
+ for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+ sb.append(String.format(" \"%s\" -> \"%s\";%n", step, outTag));
+ }
+ }
+ sb.append("}\n");
+ return sb.toString();
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
new file mode 100644
index 0000000..eb5bef4
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Operation that reads from files.
+ */
+public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> {
+
+ private final String stepName;
+ private final String fileName;
+ private final Coder<WindowedValue<T>> coder;
+ private final TupleTag<?> tupleTag;
+
+ public FileReadOperation(
+ String stepName,
+ String fileName,
+ Coder<WindowedValue<T>> coder,
+ TupleTag<?> tupleTag) {
+ super();
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.fileName = checkNotNull(fileName, "fileName");
+ this.coder = checkNotNull(coder, "coder");
+ this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+ }
+
+ @Override
+ TaggedSource getTaggedSource(Configuration conf) {
+ return TaggedSource.of(
+ stepName,
+ new FileBoundedSource<>(fileName, coder, new SerializableConfiguration(conf)),
+ tupleTag);
+ }
+
+ private static class FileBoundedSource<T> extends BoundedSource<WindowedValue<T>> {
+
+ private final String fileName;
+ private final Coder<WindowedValue<T>> coder;
+ private final SerializableConfiguration conf;
+
+ FileBoundedSource(
+ String fileName, Coder<WindowedValue<T>> coder, SerializableConfiguration conf) {
+ this.fileName = checkNotNull(fileName, "fileName");
+ this.coder = checkNotNull(coder, "coder");
+ this.conf = checkNotNull(conf, "conf");
+ }
+
+ @Override
+ public List<? extends BoundedSource<WindowedValue<T>>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ // TODO: support split.
+ return ImmutableList.of(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public BoundedReader<WindowedValue<T>> createReader(PipelineOptions options)
+ throws IOException {
+ Path pattern = new Path(
+ ConfigurationUtils.getResourceIdForOutput(fileName, conf.getConf()) + "*");
+
+ FileSystem fs = pattern.getFileSystem(conf.getConf());
+ FileStatus[] files = fs.globStatus(pattern);
+
+ Queue<SequenceFile.Reader> readers = new LinkedList<>();
+ for (FileStatus f : files) {
+ readers.add(new SequenceFile.Reader(fs, f.getPath(), conf.getConf()));
+ }
+ return new Reader<>(this, readers, coder);
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<WindowedValue<T>> getDefaultOutputCoder() {
+ return coder;
+ }
+
+ private static class Reader<T> extends BoundedReader<WindowedValue<T>> {
+
+ private final BoundedSource<WindowedValue<T>> boundedSource;
+ private final Queue<SequenceFile.Reader> readers;
+ private final Coder<WindowedValue<T>> coder;
+ private final BytesWritable value = new BytesWritable();
+
+ Reader(
+ BoundedSource<WindowedValue<T>> boundedSource,
+ Queue<SequenceFile.Reader> readers,
+ Coder<WindowedValue<T>> coder) {
+ this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+ this.readers = checkNotNull(readers, "readers");
+ this.coder = checkNotNull(coder, "coder");
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ SequenceFile.Reader reader = readers.peek();
+ if (reader == null) {
+ return false;
+ }
+ boolean hasNext = reader.next(NullWritable.get(), value);
+ if (hasNext) {
+ return true;
+ } else {
+ reader.close();
+ readers.remove(reader);
+ return advance();
+ }
+ }
+
+ @Override
+ public WindowedValue<T> getCurrent() throws NoSuchElementException {
+ ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes());
+ try {
+ return coder.decode(inStream);
+ } catch (IOException e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public BoundedSource<WindowedValue<T>> getCurrentSource() {
+ return boundedSource;
+ }
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
new file mode 100644
index 0000000..403de4e
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Files based {@link SideInputReader}.
+ */
+public class FileSideInputReader implements SideInputReader {
+
+ private final Map<TupleTag<?>, String> tupleTagToFilePath;
+ private final Map<TupleTag<?>, Coder<?>> tupleTagToCoder;
+ private final Configuration conf;
+
+ public FileSideInputReader(
+ Map<TupleTag<?>, String> tupleTagToFilePath,
+ Map<TupleTag<?>, Coder<?>> tupleTagToCoder,
+ Configuration conf) {
+ this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, "tupleTagToFilePath");
+ this.tupleTagToCoder = checkNotNull(tupleTagToCoder, "tupleTagToCoder");
+ this.conf = checkNotNull(conf, "conf");
+ }
+
+ @Nullable
+ @Override
+ public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+ String filePath = tupleTagToFilePath.get(view.getTagInternal());
+ IterableCoder<WindowedValue<?>> coder =
+ (IterableCoder<WindowedValue<?>>) tupleTagToCoder.get(view.getTagInternal());
+ Coder<WindowedValue<?>> elemCoder = coder.getElemCoder();
+
+ final BoundedWindow sideInputWindow =
+ view.getWindowMappingFn().getSideInputWindow(window);
+
+ Path pattern = new Path(filePath + "*");
+ try {
+ FileSystem fs = pattern.getFileSystem(conf);
+ FileStatus[] files = fs.globStatus(pattern);
+
+ List<WindowedValue<?>> availableSideInputs = new ArrayList<>();
+ if (files.length > 0) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf);
+ BytesWritable value = new BytesWritable();
+ while (reader.next(NullWritable.get(), value)) {
+ ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes());
+ availableSideInputs.add(elemCoder.decode(inStream));
+ }
+ }
+ Iterable<WindowedValue<?>> sideInputForWindow =
+ Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
+ @Override
+ public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+ if (sideInputCandidate == null) {
+ return false;
+ }
+ // first match of a sideInputWindow to the elementWindow is good enough.
+ for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) {
+ if (sideInputCandidateWindow.equals(sideInputWindow)) {
+ return true;
+ }
+ }
+ // no match found.
+ return false;
+ }
+ });
+ return view.getViewFn().apply(sideInputForWindow);
+ } catch (IOException e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ return tupleTagToFilePath.containsKey(view.getTagInternal());
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return tupleTagToFilePath.isEmpty();
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
new file mode 100644
index 0000000..af2e134
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+
+/**
+ * Operation that writes to files.
+ */
+public class FileWriteOperation<T> extends Operation<T> {
+
+ private final String fileName;
+ private final Coder<WindowedValue<T>> coder;
+ private transient MultipleOutputs mos;
+
+ public FileWriteOperation(String fileName, Coder<WindowedValue<T>> coder) {
+ super(0);
+ this.fileName = checkNotNull(fileName, "fileName");
+ this.coder = checkNotNull(coder, "coder");
+ }
+
+ @Override
+ public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+ this.mos = new MultipleOutputs(taskContext);
+ }
+
+ @Override
+ public void process(WindowedValue<T> elem) throws IOException, InterruptedException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ coder.encode(elem, stream);
+
+ mos.write(fileName, NullWritable.get(), new BytesWritable(stream.toByteArray()));
+ }
+
+ @Override
+ public void finish() {
+ try {
+ mos.close();
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
new file mode 100644
index 0000000..3c5ac95
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Flatten operation.
+ */
+public class FlattenOperation<T> extends Operation<T> {
+
+ private final int duplicateFactor;
+
+ public FlattenOperation(int duplicateFactor) {
+ super(1);
+ this.duplicateFactor = duplicateFactor;
+ }
+
+ @Override
+ public void process(WindowedValue elem) {
+ for (OutputReceiver receiver : getOutputReceivers()) {
+ for (int i = 0; i < duplicateFactor; ++i) {
+ receiver.process(elem);
+ }
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
new file mode 100644
index 0000000..06ad367
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Flatten} to a {@link FlattenOperation}.
+ */
+public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PCollections<T>> {
+ @Override
+ public void translateNode(Flatten.PCollections<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ Map<Graphs.Tag, Integer> inputTagToCount = Maps.newHashMap();
+ boolean containsDuplicates = false;
+ for (Graphs.Tag inputTag : userGraphContext.getInputTags()) {
+ Integer count = inputTagToCount.get(inputTag);
+ if (count == null) {
+ count = Integer.valueOf(0);
+ }
+ inputTagToCount.put(inputTag, ++count);
+ if (count > 1) {
+ containsDuplicates = true;
+ }
+ }
+
+ String stepName = userGraphContext.getStepName();
+ if (inputTagToCount.isEmpty()) {
+ // Create a empty source
+ Operation<?> operation =
+ new SourceReadOperation(
+ stepName, new EmptySource(), userGraphContext.getOnlyOutputTag());
+ context.addInitStep(
+ Graphs.Step.of(stepName, operation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ } else if (!containsDuplicates) {
+ Operation<?> operation = new FlattenOperation(1);
+ context.addInitStep(
+ Graphs.Step.of(stepName, operation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ } else {
+ List<Graphs.Tag> intermediateTags = new ArrayList<>();
+ for (Map.Entry<Graphs.Tag, Integer> entry : inputTagToCount.entrySet()) {
+ Integer dupFactor = entry.getValue();
+ Graphs.Tag inTag = entry.getKey();
+ checkState(
+ dupFactor > 0, "dupFactor should be positive, but was: " + dupFactor);
+ if (dupFactor == 1) {
+ intermediateTags.add(inTag);
+ } else {
+ String dupStepName = stepName + "/Dup-" + dupFactor;
+ Graphs.Tag outTag = Graphs.Tag.of(
+ dupStepName + ".out",
+ new TupleTag<T>(),
+ inTag.getCoder(),
+ inTag.getWindowingStrategy());
+ context.addInitStep(
+ Graphs.Step.of(dupStepName, new FlattenOperation(dupFactor)),
+ ImmutableList.of(inTag),
+ ImmutableList.of(outTag));
+ intermediateTags.add(outTag);
+ }
+ }
+ context.addInitStep(
+ Graphs.Step.of(stepName, new FlattenOperation(1)),
+ intermediateTags,
+ userGraphContext.getOutputTags());
+ }
+ }
+
+ private static class EmptySource extends BoundedSource<Void> {
+ @Override
+ public List<? extends BoundedSource<Void>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public BoundedReader<Void> createReader(PipelineOptions options) throws IOException {
+ return new BoundedReader<Void>() {
+ @Override
+ public BoundedSource<Void> getCurrentSource() {
+ return EmptySource.this;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Void getCurrent() throws NoSuchElementException {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
new file mode 100644
index 0000000..b4549d3
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.graph.ElementOrder;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Graph that represents a Beam DAG.
+ */
+public class Graph<StepT extends Graph.AbstractStep, TagT extends Graph.AbstractTag> {
+
+ public final MutableGraph<Vertex> graph;
+
+ public Graph() {
+ this.graph = GraphBuilder.directed()
+ .allowsSelfLoops(false)
+ .nodeOrder(ElementOrder.insertion())
+ .build();
+ }
+
+ /**
+ * Adds {@link StepT} to this {@link Graph}.
+ */
+ public void addStep(StepT step, List<TagT> inTags, List<TagT> outTags) {
+ graph.addNode(step);
+ Set<Vertex> nodes = graph.nodes();
+ for (TagT tag : inTags) {
+ if (!nodes.contains(tag)) {
+ graph.addNode(tag);
+ }
+ graph.putEdge(tag, step);
+ }
+ for (TagT tag : outTags) {
+ if (!nodes.contains(tag)) {
+ graph.addNode(tag);
+ }
+ graph.putEdge(step, tag);
+ }
+ }
+
+ public void removeStep(StepT step) {
+ graph.removeNode(step);
+ }
+
+ public void removeTag(TagT tag) {
+ graph.removeNode(tag);
+ }
+
+ public void addEdge(TagT inTag, StepT step) {
+ graph.putEdge(inTag, step);
+ }
+
+ public void addEdge(StepT step, TagT outTag) {
+ graph.putEdge(step, outTag);
+ }
+
+ public void removeEdge(TagT inTag, StepT step) {
+ graph.removeEdge(inTag, step);
+ }
+
+ public void removeEdge(StepT step, TagT outTag) {
+ graph.removeEdge(step, outTag);
+ }
+
+ public List<StepT> getSteps() {
+ List<Vertex> ret = new ArrayList<>();
+
+ Set<Vertex> pendingNodes = Sets.newHashSet(graph.nodes());
+ while (!pendingNodes.isEmpty()) {
+ List<Vertex> readyNodes = new ArrayList<>();
+ for (Vertex v : pendingNodes) {
+ if (Sets.intersection(pendingNodes, graph.predecessors(v)).isEmpty()) {
+ readyNodes.add(v);
+ }
+ }
+ checkState(
+ !readyNodes.isEmpty(),
+ "No ready nodes found, there are cycles in graph: " + graph);
+ ret.addAll(readyNodes);
+ pendingNodes.removeAll(readyNodes);
+ }
+ return castToStepList(FluentIterable.from(ret)
+ .filter(new Predicate<Vertex>() {
+ @Override
+ public boolean apply(Vertex input) {
+ return input instanceof AbstractStep;
+ }}))
+ .toList();
+ }
+
+ public List<StepT> getStartSteps() {
+ return castToStepList(FluentIterable.from(graph.nodes())
+ .filter(new Predicate<Vertex>() {
+ @Override
+ public boolean apply(Vertex input) {
+ return input instanceof AbstractStep && graph.inDegree(input) == 0;
+ }}))
+ .toList();
+ }
+
+ public StepT getProducer(TagT tag) {
+ if (contains(tag)) {
+ return (StepT) Iterables.getOnlyElement(graph.predecessors(tag));
+ } else {
+ return null;
+ }
+ }
+
+ public List<StepT> getConsumers(TagT tag) {
+ if (contains(tag)) {
+ return castToStepList(graph.successors(tag)).toList();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public List<TagT> getInputTags(StepT step) {
+ if (contains(step)) {
+ return castToTagList(graph.predecessors(step)).toList();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public List<TagT> getOutputTags(StepT step) {
+ if (contains(step)) {
+ return castToTagList(graph.successors(step)).toList();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ private boolean contains(Vertex node) {
+ return graph.nodes().contains(node);
+ }
+
+ private FluentIterable<StepT> castToStepList(Iterable<Vertex> vertices) {
+ return FluentIterable.from(vertices)
+ .transform(new Function<Vertex, StepT>() {
+ @Override
+ public StepT apply(Vertex input) {
+ return (StepT) input;
+ }});
+ }
+
+ private FluentIterable<TagT> castToTagList(Iterable<Vertex> vertices) {
+ return FluentIterable.from(vertices)
+ .transform(new Function<Vertex, TagT>() {
+ @Override
+ public TagT apply(Vertex input) {
+ return (TagT) input;
+ }});
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof Graph) {
+ Graph other = (Graph) obj;
+ return com.google.common.graph.Graphs.equivalent(this.graph, other.graph);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.getClass(), graph.nodes());
+ }
+
+ /**
+ * Vertex interface of this Graph.
+ */
+ interface Vertex {
+ }
+
+ /**
+ * Step {@link Vertex}.
+ */
+ public abstract static class AbstractStep implements Vertex {
+ }
+
+ /**
+ * Tag {@link Vertex}.
+ */
+ public abstract static class AbstractTag implements Vertex {
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
new file mode 100644
index 0000000..1a4988b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.Stack;
+import org.apache.beam.runners.mapreduce.MapReduceRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Pipeline translator for {@link MapReduceRunner}.
+ */
+public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
+
+ private final TranslationContext context;
+ private final Stack<StringBuilder> dotfileNodesBuilders;
+ private final Map<TransformHierarchy.Node, Integer> enclosedTransformCounts;
+ private final StringBuilder dotfileEdgesBuilder;
+
+ private int indent;
+
+ public GraphConverter(TranslationContext context) {
+ this.context = checkNotNull(context, "context");
+ this.enclosedTransformCounts = Maps.newHashMap();
+ this.dotfileNodesBuilders = new Stack<>();
+ this.dotfileEdgesBuilder = new StringBuilder();
+ this.indent = 0;
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ // check if current composite transforms need to be translated.
+ // If not, all sub transforms will be translated in visitPrimitiveTransform.
+ PTransform<?, ?> transform = node.getTransform();
+ dotfileNodesBuilders.push(new StringBuilder());
+ if (transform != null) {
+ markEnclosedTransformCounts(node);
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+ if (translator != null && applyCanTranslate(transform, node, translator)) {
+ applyTransform(transform, node, translator);
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+ indent += 2;
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ if (node.getTransform() != null) {
+ Integer enclosedTransformCount = enclosedTransformCounts.get(node);
+ if (enclosedTransformCount != null && enclosedTransformCount > 1) {
+ dotfileNodesBuilders.peek().insert(0, new StringBuilder()
+ .append(getIndent()).append(
+ String.format("subgraph \"cluster_%s\" {", node.getFullName()))
+ .append('\n')
+ .append(getIndent()).append(
+ String.format(" label=\"%s\";", node.getFullName()))
+ .append('\n')
+ .toString());
+ dotfileNodesBuilders.peek().append(new StringBuilder()
+ .append(getIndent()).append("}").append('\n')
+ .toString());
+ }
+ StringBuilder top = dotfileNodesBuilders.pop();
+ dotfileNodesBuilders.peek().append(top.toString());
+ indent -= 2;
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ if (!node.isRootNode()) {
+ markEnclosedTransformCounts(node);
+
+ PTransform<?, ?> transform = node.getTransform();
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+ if (translator == null || !applyCanTranslate(transform, node, translator)) {
+ throw new UnsupportedOperationException(
+ "The transform " + transform + " is currently not supported.");
+ }
+ applyTransform(transform, node, translator);
+ }
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {
+ dotfileNodesBuilders.peek().append(getIndent())
+ .append(String.format("\"%s\" [shape=ellipse];", value.getName()))
+ .append('\n');
+ }
+
+ private void markEnclosedTransformCounts(TransformHierarchy.Node node) {
+ TransformHierarchy.Node parent = node.getEnclosingNode();
+ Integer primitiveCount = enclosedTransformCounts.get(parent);
+ if (primitiveCount == null) {
+ primitiveCount = 0;
+ }
+ enclosedTransformCounts.put(parent, primitiveCount + 1);
+ }
+
+ public String getDotfile() {
+ return String.format(
+ "%ndigraph G {%n%s%s}%n",
+ dotfileNodesBuilders.peek().toString(),
+ dotfileEdgesBuilder.toString());
+ }
+
+ private <T extends PTransform<?, ?>> void applyTransform(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ dotfileNodesBuilders.peek()
+ .append(getIndent())
+ .append(String.format("\"%s\" [shape=box];", node.getFullName()))
+ .append('\n');
+ for (PValue input : node.getInputs().values()) {
+ dotfileEdgesBuilder
+ .append(String.format(" \"%s\" -> \"%s\";", input.getName(), node.getFullName()))
+ .append('\n');
+ }
+ for (PValue output : node.getOutputs().values()) {
+ dotfileEdgesBuilder
+ .append(String.format(" \"%s\" -> \"%s\";", node.getFullName(), output.getName()))
+ .append('\n');
+ }
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+ context.getUserGraphContext().setCurrentNode(node);
+ typedTranslator.translateNode(typedTransform, context);
+ }
+
+ private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+ context.getUserGraphContext().setCurrentNode(node);
+ return typedTranslator.canTranslate(typedTransform, context);
+ }
+
+ private String getIndent() {
+ StringBuilder ret = new StringBuilder();
+ for (int i = 0; i < indent; ++i) {
+ ret.append(' ');
+ }
+ return ret.toString();
+ }
+}
+
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
new file mode 100644
index 0000000..bc360fb
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Class that optimizes the initial graph to a fused graph.
+ */
+public class GraphPlanner {
+
+ private final ConfigurationUtils configUtils;
+
+ public GraphPlanner(MapReducePipelineOptions options) {
+ checkNotNull(options, "options");
+ this.configUtils = new ConfigurationUtils(options);
+ }
+
+ public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) {
+ // Attach writes/reads on fusion boundaries.
+ for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+ for (Graphs.Tag tag : fusedGraph.getOutputTags(fusedStep)) {
+ List<Graphs.FusedStep> consumers = fusedGraph.getConsumers(tag);
+ if (consumers.isEmpty()) {
+ continue;
+ }
+ Graphs.Step producer = fusedStep.getProducer(tag);
+ if (producer.getOperation() instanceof FileWriteOperation) {
+ continue;
+ }
+ String tagName = tag.getName();
+ String fileName = ConfigurationUtils.toFileName(tagName);
+
+ WindowedValue.WindowedValueCoder<?> writeValueCoder = WindowedValue.getFullCoder(
+ tag.getCoder(), tag.getWindowingStrategy().getWindowFn().windowCoder());
+
+ fusedStep.addStep(
+ Graphs.Step.of(
+ tagName + "/Write",
+ new FileWriteOperation(fileName, writeValueCoder)),
+ ImmutableList.of(tag),
+ ImmutableList.<Graphs.Tag>of());
+
+ String readStepName = tagName + "/Read";
+ Graphs.Tag readOutput = Graphs.Tag.of(
+ readStepName + ".out", tag.getTupleTag(), tag.getCoder(), tag.getWindowingStrategy());
+ for (Graphs.FusedStep consumer : consumers) {
+ // Re-direct tag to readOutput.
+ List<Graphs.Step> receivers = consumer.getConsumers(tag);
+ for (Graphs.Step step : receivers) {
+ consumer.addEdge(readOutput, step);
+ }
+ consumer.removeTag(tag);
+
+ String filePath = configUtils.getFileOutputPath(fusedStep.getStageId(), fileName);
+ consumer.addStep(
+ Graphs.Step.of(
+ readStepName,
+ new FileReadOperation(
+ readStepName, filePath, writeValueCoder, tag.getTupleTag())),
+ ImmutableList.<Graphs.Tag>of(),
+ ImmutableList.of(readOutput));
+ }
+ }
+ }
+
+ // Insert PartitionOperation
+ for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+ List<Graphs.Step> readSteps = fusedStep.getStartSteps();
+
+ List<ReadOperation> readOperations = new ArrayList<>();
+ List<Graphs.Tag> readOutTags = new ArrayList<>();
+ List<TupleTag<?>> readOutTupleTags = new ArrayList<>();
+ StringBuilder partitionStepName = new StringBuilder();
+ for (Graphs.Step step : readSteps) {
+ checkState(step.getOperation() instanceof ReadOperation);
+ readOperations.add(((ReadOperation) step.getOperation()));
+ Graphs.Tag tag = Iterables.getOnlyElement(fusedStep.getOutputTags(step));
+ readOutTags.add(tag);
+ readOutTupleTags.add(tag.getTupleTag());
+ partitionStepName.append(step.getFullName());
+
+ fusedStep.removeStep(step);
+ }
+ if (partitionStepName.length() > 0) {
+ partitionStepName.deleteCharAt(partitionStepName.length() - 1);
+ }
+
+ Graphs.Step partitionStep = Graphs.Step.of(
+ partitionStepName.toString(), new PartitionOperation(readOperations, readOutTupleTags));
+ fusedStep.addStep(partitionStep, ImmutableList.<Graphs.Tag>of(), readOutTags);
+ }
+
+ // Setup side inputs
+ for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+ for (Graphs.Step step : fusedStep.getSteps()) {
+ if (!(step.getOperation() instanceof ParDoOperation)) {
+ continue;
+ }
+ ParDoOperation parDo = (ParDoOperation) step.getOperation();
+ List<Graphs.Tag> sideInputTags = parDo.getSideInputTags();
+ if (sideInputTags.size() == 0) {
+ continue;
+ }
+ Map<TupleTag<?>, String> tupleTagToFilePath = Maps.newHashMap();
+ for (Graphs.Tag sideInTag : sideInputTags) {
+ tupleTagToFilePath.put(
+ sideInTag.getTupleTag(),
+ configUtils.getFileOutputPath(
+ fusedGraph.getProducer(sideInTag).getStageId(),
+ ConfigurationUtils.toFileName(sideInTag.getName())));
+ }
+ parDo.setupSideInput(tupleTagToFilePath);
+ }
+ }
+ return fusedGraph;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
new file mode 100644
index 0000000..f23e572
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Class that defines graph vertices.
+ */
+public class Graphs {
+
+ private Graphs() {}
+
+ /**
+ * Class that represents an optimized graph.
+ */
+ public static class FusedGraph {
+ private final Graph<FusedStep, Tag> graph;
+ private int stageId = 0;
+
+ public FusedGraph() {
+ this.graph = new Graph<>();
+ }
+
+ public FusedGraph(Graph<Graphs.Step, Tag> initGraph) {
+ this.graph = new Graph<>();
+
+ // Convert from the list of steps to Graphs.
+ for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) {
+ tryFuse(step, initGraph.getInputTags(step), initGraph.getOutputTags(step));
+ }
+ // Remove unused external tags.
+ for (FusedStep fusedStep : graph.getSteps()) {
+ for (Tag outTag : graph.getOutputTags(fusedStep)) {
+ if (graph.getConsumers(outTag).isEmpty()) {
+ graph.removeTag(outTag);
+ }
+ }
+ }
+ }
+
+ public void tryFuse(
+ Graphs.Step step,
+ List<Graphs.Tag> inTags,
+ List<Graphs.Tag> outTags) {
+ if (canFuse(step, inTags, outTags)) {
+ Graphs.Tag outTag = Iterables.getOnlyElement(outTags);
+ Graphs.FusedStep consumer = Iterables.getOnlyElement(graph.getConsumers(outTag));
+ consumer.addStep(step, inTags, outTags);
+ for (Graphs.Tag in : inTags) {
+ graph.addEdge(in, consumer);
+ }
+ graph.removeTag(outTag);
+ graph.addEdge(consumer, outTag);
+ } else {
+ Graphs.FusedStep newFusedStep = new Graphs.FusedStep(stageId++);
+ newFusedStep.addStep(step, inTags, outTags);
+ graph.addStep(newFusedStep, inTags, outTags);
+ }
+ }
+
+ private boolean canFuse(
+ Graphs.Step step,
+ List<Graphs.Tag> inTags,
+ List<Graphs.Tag> outTags) {
+ if (step.getOperation() instanceof FileWriteOperation) {
+ return false;
+ }
+ if (outTags.size() != 1) {
+ return false;
+ }
+ Graphs.Tag outTag = Iterables.getOnlyElement(outTags);
+ if (graph.getConsumers(outTag).size() != 1) {
+ return false;
+ }
+ Graphs.FusedStep consumer = Iterables.getOnlyElement(graph.getConsumers(outTag));
+ if (consumer.containsGroupByKey() && step.getOperation() instanceof GroupByKeyOperation) {
+ return false;
+ }
+ return true;
+ }
+
+ public FusedStep getProducer(Tag tag) {
+ return graph.getProducer(tag);
+ }
+
+ public List<FusedStep> getConsumers(Tag tag) {
+ return graph.getConsumers(tag);
+ }
+
+ public List<FusedStep> getFusedSteps() {
+ return graph.getSteps();
+ }
+
+ public List<Tag> getInputTags(FusedStep fusedStep) {
+ return graph.getInputTags(fusedStep);
+ }
+
+ public List<Tag> getOutputTags(FusedStep fusedStep) {
+ return graph.getOutputTags(fusedStep);
+ }
+ }
+
+ /**
+ * An {@link Graph.AbstractStep} that represents an optimized sub-graph that can be executed
+ * in one MapReduce job.
+ */
+ public static class FusedStep extends Graph.AbstractStep {
+ private final int stageId;
+ private final Graph<Step, Tag> steps;
+ private Step groupByKeyStep;
+
+ public FusedStep(int stageid) {
+ this.stageId = stageid;
+ this.steps = new Graph<>();
+ this.groupByKeyStep = null;
+ }
+
+ public int getStageId() {
+ return stageId;
+ }
+
+ public List<Tag> getInputTags(Step step) {
+ return steps.getInputTags(step);
+ }
+
+ public List<Tag> getOutputTags(Step step) {
+ return steps.getOutputTags(step);
+ }
+
+ public void addStep(Step step, List<Tag> inTags, List<Tag> outTags) {
+ steps.addStep(step, inTags, outTags);
+ if (step.getOperation() instanceof GroupByKeyOperation) {
+ groupByKeyStep = step;
+ }
+ }
+
+ public void removeStep(Step step) {
+ steps.removeStep(step);
+ }
+
+ public void removeTag(Tag tag) {
+ steps.removeTag(tag);
+ }
+
+ public void addEdge(Tag inTag, Step step) {
+ steps.addEdge(inTag, step);
+ }
+
+ public void addEdge(Step step, Tag outTag) {
+ steps.addEdge(step, outTag);
+ }
+
+ public void removeEdge(Tag inTag, Step step) {
+ steps.removeEdge(inTag, step);
+ }
+
+ public void removeEdge(Step step, Tag outTag) {
+ steps.removeEdge(step, outTag);
+ }
+
+ public Step getProducer(Tag tag) {
+ return steps.getProducer(tag);
+ }
+
+ public List<Step> getConsumers(Tag tag) {
+ return steps.getConsumers(tag);
+ }
+
+ public List<Step> getSteps() {
+ return steps.getSteps();
+ }
+
+ public List<Step> getStartSteps() {
+ return steps.getStartSteps();
+ }
+
+ public boolean containsGroupByKey() {
+ return groupByKeyStep != null;
+ }
+
+ @Nullable
+ public Step getGroupByKeyStep() {
+ return groupByKeyStep;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Step step : steps.getSteps()) {
+ sb.append(step.getFullName() + "|");
+ }
+ if (sb.length() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+ }
+
+ /**
+ * An {@link Graph.AbstractStep} that represents one {@link Operation}.
+ */
+ @AutoValue
+ public abstract static class Step extends Graph.AbstractStep {
+ abstract String getFullName();
+ // TODO: remove public
+ public abstract Operation getOperation();
+
+ public static Step of(String fullName, Operation operation) {
+ return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step(
+ fullName, operation);
+ }
+
+ @Override
+ public String toString() {
+ return getFullName();
+ }
+ }
+
+ /**
+ * An {@link Graph.AbstractTag} that contains information about input/output data.
+ */
+ @AutoValue
+ public abstract static class Tag extends Graph.AbstractTag implements Serializable {
+ abstract String getName();
+ abstract TupleTag<?> getTupleTag();
+ abstract Coder<?> getCoder();
+ abstract WindowingStrategy<?, ?> getWindowingStrategy();
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ public static Tag of(
+ String name,
+ TupleTag<?> tupleTag,
+ Coder<?> coder,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag(
+ name, tupleTag, coder, windowingStrategy);
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
new file mode 100644
index 0000000..14e3a29
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * {@link Operation} that executes a {@link GroupAlsoByWindowsViaOutputBufferDoFn}.
+ */
+public class GroupAlsoByWindowsParDoOperation extends ParDoOperation {
+
+ private final Coder<?> inputCoder;
+
+ public GroupAlsoByWindowsParDoOperation(
+ String stepName,
+ PipelineOptions options,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Coder<?> inputCoder,
+ Graphs.Tag outTag) {
+ super(stepName, options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+ ImmutableList.<Graphs.Tag>of(), windowingStrategy);
+ this.inputCoder = checkNotNull(inputCoder, "inputCoder");
+ }
+
+ @Override
+ DoFn<Object, Object> getDoFn() {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn(
+ windowingStrategy,
+ SystemReduceFn.buffering(inputCoder),
+ mainOutputTag,
+ createOutputManager());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..5ac23a5
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+
+/**
+ * The default batch implementation, if no specialized "fast path" implementation is applicable.
+ */
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
+ extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+
+ private final WindowingStrategy<Object, W> windowingStrategy;
+ private final SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+ private final TupleTag<KV<K, OutputT>> mainTag;
+ private transient DoFnRunners.OutputManager outputManager;
+
+ public GroupAlsoByWindowsViaOutputBufferDoFn(
+ WindowingStrategy<Object, W> windowingStrategy,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+ TupleTag<KV<K, OutputT>> mainTag,
+ DoFnRunners.OutputManager outputManager) {
+ this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
+ this.reduceFn = checkNotNull(reduceFn, "reduceFn");
+ this.mainTag = checkNotNull(mainTag, "mainTag");
+ this.outputManager = checkNotNull(outputManager, "outputManager");
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ K key = c.element().getKey();
+ // Used with Batch, we know that all the data is available for this key. We can't use the
+ // timer manager from the context because it doesn't exist. So we create one and emulate the
+ // watermark, knowing that we have all data and it is in timestamp order.
+ InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+ ReduceFnRunner<K, InputT, OutputT, W> runner = new ReduceFnRunner<>(
+ key,
+ windowingStrategy,
+ ExecutableTriggerStateMachine.create(
+ TriggerStateMachines.stateMachineForTrigger(
+ TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
+ InMemoryStateInternals.forKey(key),
+ timerInternals,
+ outputWindowedValue(),
+ NullSideInputReader.empty(),
+ reduceFn,
+ c.getPipelineOptions());
+
+ Iterable<List<WindowedValue<InputT>>> chunks =
+ Iterables.partition(c.element().getValue(), 1000);
+ for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+ // Process the chunk of elements.
+ runner.processElements(chunk);
+
+ // Then, since elements are sorted by their timestamp, advance the input watermark
+ // to the first element, and fire any timers that may have been scheduled.
+ // TODO: re-enable once elements are sorted.
+ // timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
+
+ // Fire any processing timers that need to fire
+ timerInternals.advanceProcessingTime(Instant.now());
+
+ // Leave the output watermark undefined. Since there's no late data in batch mode
+ // there's really no need to track it as we do for streaming.
+ }
+
+ // Finish any pending windows by advancing the input watermark to infinity.
+ timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ // Finally, advance the processing time to infinity to fire any timers.
+ timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ runner.onTimers(timerInternals.getTimers(TimeDomain.EVENT_TIME));
+ runner.onTimers(timerInternals.getTimers(TimeDomain.PROCESSING_TIME));
+ runner.onTimers(timerInternals.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+
+ runner.persist();
+ }
+
+ private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
+ return new OutputWindowedValue<KV<K, OutputT>>() {
+ @Override
+ public void outputWindowedValue(
+ KV<K, OutputT> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(mainTag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+ };
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
new file mode 100644
index 0000000..b0be494
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * A GroupByKey place holder {@link Operation} during pipeline translation.
+ */
+public class GroupByKeyOperation<K, V> extends Operation<KV<K, V>> {
+
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final KvCoder<K, V> kvCoder;
+
+ public GroupByKeyOperation(WindowingStrategy<?, ?> windowingStrategy, KvCoder<K, V> kvCoder) {
+ super(1);
+ this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
+ this.kvCoder = checkNotNull(kvCoder, "kvCoder");
+ }
+
+ @Override
+ public void process(WindowedValue elem) {
+ throw new IllegalStateException(
+ String.format("%s should not in execution graph.", this.getClass().getSimpleName()));
+ }
+
+ public WindowingStrategy<?, ?> getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ public KvCoder<K, V> getKvCoder() {
+ return kvCoder;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..4c627d7
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link GroupByKey} to {@link Operation Operations}.
+ */
+class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ PCollection<?> inPCollection = (PCollection<?>) userGraphContext.getInput();
+ WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy();
+ Coder<?> inCoder = inPCollection.getCoder();
+
+ GroupByKeyOperation<K, V> groupByKeyOperation =
+ new GroupByKeyOperation<>(windowingStrategy, (KvCoder<K, V>) inCoder);
+ context.addInitStep(
+ Graphs.Step.of(userGraphContext.getStepName(), groupByKeyOperation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
new file mode 100644
index 0000000..e8e6eab
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * Class that translates a {@link Graphs.FusedStep} to a MapReduce job.
+ */
+public class JobPrototype {
+
+ public static JobPrototype create(
+ int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) {
+ return new JobPrototype(stageId, fusedStep, options);
+ }
+
+ private final int stageId;
+ private final Graphs.FusedStep fusedStep;
+ private final MapReducePipelineOptions options;
+ private final ConfigurationUtils configUtils;
+
+ private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) {
+ this.stageId = stageId;
+ this.fusedStep = checkNotNull(fusedStep, "fusedStep");
+ this.options = checkNotNull(options, "options");
+ this.configUtils = new ConfigurationUtils(options);
+ }
+
+ public Job build(Class<?> jarClass, Configuration initConf) throws IOException {
+ Job job = new Job(initConf);
+ final Configuration conf = job.getConfiguration();
+ job.setJarByClass(jarClass);
+ conf.set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.WritableSerialization,"
+ + "org.apache.hadoop.io.serializer.JavaSerialization");
+ conf.set("mapreduce.job.counters.group.name.max", "512");
+ Limits.init(conf);
+
+ conf.set(
+ FileOutputFormat.OUTDIR,
+ configUtils.getFileOutputDir(fusedStep.getStageId()));
+
+ // Setup BoundedSources in BeamInputFormat.
+ Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps());
+ checkState(startStep.getOperation() instanceof PartitionOperation);
+ PartitionOperation partitionOperation = (PartitionOperation) startStep.getOperation();
+
+ ArrayList<ReadOperation.TaggedSource> taggedSources = new ArrayList<>();
+ taggedSources.addAll(FluentIterable.from(partitionOperation
+ .getReadOperations())
+ .transform(new Function<ReadOperation, ReadOperation.TaggedSource>() {
+ @Override
+ public ReadOperation.TaggedSource apply(ReadOperation operation) {
+ return operation.getTaggedSource(conf);
+ }})
+ .toList());
+ conf.set(
+ BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
+ Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+ taggedSources)));
+ conf.set(
+ BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
+ Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+ new SerializedPipelineOptions(options))));
+ job.setInputFormatClass(BeamInputFormat.class);
+
+ if (fusedStep.containsGroupByKey()) {
+ Graphs.Step groupByKey = fusedStep.getGroupByKeyStep();
+ Graphs.Tag gbkOutTag = Iterables.getOnlyElement(fusedStep.getOutputTags(groupByKey));
+ GroupByKeyOperation operation = (GroupByKeyOperation) groupByKey.getOperation();
+ WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy();
+ KvCoder<?, ?> kvCoder = operation.getKvCoder();
+
+ String reifyStepName = groupByKey.getFullName() + "-Reify";
+ Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy);
+ Graphs.Tag reifyOutputTag = Graphs.Tag.of(
+ reifyStepName + ".out", new TupleTag<>(), reifyValueCoder, windowingStrategy);
+ Graphs.Step reifyStep = Graphs.Step.of(
+ reifyStepName,
+ new ReifyTimestampAndWindowsParDoOperation(
+ reifyStepName, options, operation.getWindowingStrategy(), reifyOutputTag));
+
+ Graphs.Step writeStep = Graphs.Step.of(
+ groupByKey.getFullName() + "-Write",
+ new ShuffleWriteOperation(kvCoder.getKeyCoder(), reifyValueCoder));
+
+ String gabwStepName = groupByKey.getFullName() + "-GroupAlsoByWindows";
+ Graphs.Step gabwStep = Graphs.Step.of(
+ gabwStepName,
+ new GroupAlsoByWindowsParDoOperation(
+ gabwStepName, options, windowingStrategy, kvCoder, gbkOutTag));
+
+ fusedStep.addStep(
+ reifyStep, fusedStep.getInputTags(groupByKey), ImmutableList.of(reifyOutputTag));
+ fusedStep.addStep(
+ writeStep, ImmutableList.of(reifyOutputTag), Collections.<Graphs.Tag>emptyList());
+ fusedStep.addStep(
+ gabwStep, Collections.<Graphs.Tag>emptyList(), ImmutableList.of(gbkOutTag));
+ fusedStep.removeStep(groupByKey);
+
+ // Setup BeamReducer
+ Graphs.Step reducerStartStep = gabwStep;
+ chainOperations(reducerStartStep, fusedStep, Sets.<Graphs.Step>newHashSet());
+ conf.set(
+ BeamReducer.BEAM_REDUCER_KV_CODER,
+ Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+ KvCoder.of(kvCoder.getKeyCoder(), reifyValueCoder))));
+ conf.set(
+ BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER,
+ Base64.encodeBase64String(
+ SerializableUtils.serializeToByteArray(reducerStartStep.getOperation())));
+ job.setReducerClass(BeamReducer.class);
+ }
+
+ // Setup DoFns in BeamMapper.
+ chainOperations(startStep, fusedStep, Sets.<Graphs.Step>newHashSet());
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(byte[].class);
+ conf.set(
+ BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
+ Base64.encodeBase64String(
+ SerializableUtils.serializeToByteArray(startStep.getOperation())));
+ job.setMapperClass(BeamMapper.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ for (Graphs.Step step : fusedStep.getSteps()) {
+ if (step.getOperation() instanceof FileWriteOperation) {
+ FileWriteOperation writeOperation = (FileWriteOperation) step.getOperation();
+ //SequenceFileOutputFormat.setOutputPath(job, new Path("/tmp/mapreduce/"));
+ MultipleOutputs.addNamedOutput(
+ job,
+ writeOperation.getFileName(),
+ SequenceFileOutputFormat.class,
+ NullWritable.class, BytesWritable.class);
+ }
+ }
+ return job;
+ }
+
+ private void chainOperations(
+ Graphs.Step current, Graphs.FusedStep fusedStep, Set<Graphs.Step> visited) {
+ Operation<?> operation = current.getOperation();
+ List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current);
+ for (Graphs.Tag outTag : outputTags) {
+ for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
+ operation.attachConsumer(outTag.getTupleTag(), consumer.getOperation());
+ }
+ }
+ visited.add(current);
+ for (Graphs.Tag outTag : outputTags) {
+ for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
+ if (!visited.contains(consumer)) {
+ chainOperations(consumer, fusedStep, visited);
+ }
+ }
+ }
+ }
+
+ private Coder<Object> getReifyValueCoder(
+ Coder<?> valueCoder, WindowingStrategy<?, ?> windowingStrategy) {
+ // TODO: do we need full coder to encode windows.
+ return (Coder) WindowedValue.getFullCoder(
+ valueCoder, windowingStrategy.getWindowFn().windowCoder());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
new file mode 100644
index 0000000..1d1c9ff
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Implementation of {@link MetricResults} for the MapReduce Runner.
+ */
+public class MapReduceMetricResults extends MetricResults {
+
+ private final List<Job> jobs;
+
+ public MapReduceMetricResults(List<Job> jobs) {
+ this.jobs = checkNotNull(jobs, "jobs");
+ }
+
+ @Override
+ public MetricQueryResults queryMetrics(MetricsFilter filter) {
+ List<MetricResult<Long>> counters = new ArrayList<>();
+ for (Job job : jobs) {
+ Iterable<CounterGroup> groups;
+ try {
+ groups = job.getCounters();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ for (CounterGroup group : groups) {
+ String groupName = group.getName();
+ for (Counter counter : group) {
+ MetricKey metricKey = MetricsReporter.toMetricKey(groupName, counter.getName());
+ if (!MetricFiltering.matches(filter, metricKey)) {
+ continue;
+ }
+ counters.add(
+ MapReduceMetricResult.create(
+ metricKey.metricName(),
+ metricKey.stepName(),
+ counter.getValue()));
+ }
+ }
+ }
+ return MapReduceMetricQueryResults.create(counters);
+ }
+
+
+ @AutoValue
+ abstract static class MapReduceMetricQueryResults implements MetricQueryResults {
+
+ public abstract @Nullable Iterable<MetricResult<DistributionResult>> distributions();
+ public abstract @Nullable Iterable<MetricResult<GaugeResult>> gauges();
+
+ public static MetricQueryResults create(Iterable<MetricResult<Long>> counters) {
+ return new AutoValue_MapReduceMetricResults_MapReduceMetricQueryResults(
+ counters, null, null);
+ }
+ }
+
+ @AutoValue
+ abstract static class MapReduceMetricResult<T> implements MetricResult<T> {
+ // need to define these here so they appear in the correct order
+ // and the generated constructor is usable and consistent
+ public abstract MetricName name();
+ public abstract String step();
+ public abstract @Nullable T committed();
+ public abstract T attempted();
+
+ public static <T> MetricResult<T> create(MetricName name, String step, T attempted) {
+ return new AutoValue_MapReduceMetricResults_MapReduceMetricResult<T>(
+ name, step, null, attempted);
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
new file mode 100644
index 0000000..9fe139d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to MapReduce framework.
+ */
+public class MetricsReporter {
+
+ private static final String METRIC_KEY_SEPARATOR = "__";
+ private static final String METRIC_PREFIX = "__metrics";
+
+ private final TaskAttemptContext context;
+ private final MetricsContainerStepMap metricsContainers;
+ private final Map<String, Long> reportedCounters = Maps.newHashMap();
+
+ MetricsReporter(TaskAttemptContext context) {
+ this.context = checkNotNull(context, "context");
+ this.metricsContainers = new MetricsContainerStepMap();
+ }
+
+ public MetricsContainer getMetricsContainer(String stepName) {
+ return metricsContainers.getContainer(stepName);
+ }
+
+ public void updateMetrics() {
+ MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+ MetricQueryResults metricQueryResults =
+ metricResults.queryMetrics(MetricsFilter.builder().build());
+ updateCounters(metricQueryResults.counters());
+ }
+
+ private void updateCounters(Iterable<MetricResult<Long>> counters) {
+ for (MetricResult<Long> metricResult : counters) {
+ String reportedCounterKey = reportedCounterKey(metricResult);
+ Long updateValue = metricResult.attempted();
+ Long oldValue = reportedCounters.get(reportedCounterKey);
+
+ if (oldValue == null || oldValue < updateValue) {
+ Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+ context.getCounter(groupName(metricResult), metricResult.name().name())
+ .increment(incValue);
+ reportedCounters.put(reportedCounterKey, updateValue);
+ }
+ }
+ }
+
+ private String groupName(MetricResult<?> metricResult) {
+ return METRIC_PREFIX
+ + METRIC_KEY_SEPARATOR + metricResult.step()
+ + METRIC_KEY_SEPARATOR + metricResult.name().namespace();
+ }
+
+ private String reportedCounterKey(MetricResult<?> metricResult) {
+ return metricResult.step()
+ + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+ + METRIC_KEY_SEPARATOR + metricResult.name().name();
+ }
+
+ public static MetricKey toMetricKey(String groupName, String counterName) {
+ String[] nameSplits = groupName.split(METRIC_KEY_SEPARATOR);
+ int length = nameSplits.length;
+ String stepName = length > 1 ? nameSplits[length - 2] : "";
+ String namespace = length > 0 ? nameSplits[length - 1] : "";
+ return MetricKey.create(stepName, MetricName.named(namespace, counterName));
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
new file mode 100644
index 0000000..8b730ff
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * {@link Operation} that executes a {@link DoFn}.
+ */
+public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT, OutputT> {
+
+ private final DoFn<InputT, OutputT> doFn;
+
+ public NormalParDoOperation(
+ String stepName,
+ DoFn<InputT, OutputT> doFn,
+ PipelineOptions options,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ List<Graphs.Tag> sideInputTags,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ super(stepName, options, mainOutputTag, sideOutputTags, sideInputTags, windowingStrategy);
+ this.doFn = checkNotNull(doFn, "doFn");
+ }
+
+ @Override
+ DoFn<InputT, OutputT> getDoFn() {
+ return doFn;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
new file mode 100644
index 0000000..a96806d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Class that processes elements and forwards outputs to consumers.
+ */
+public abstract class Operation<T> implements Serializable {
+ private final OutputReceiver[] receivers;
+ private SerializableConfiguration conf;
+ private boolean started;
+ private boolean finished;
+
+ public Operation(int numOutputs) {
+ this.receivers = new OutputReceiver[numOutputs];
+ for (int i = 0; i < numOutputs; ++i) {
+ receivers[i] = new OutputReceiver();
+ }
+ this.started = false;
+ this.finished = false;
+ }
+
+ /**
+ * Starts this Operation's execution.
+ *
+ * <p>Called after all successors consuming operations have been started.
+ */
+ public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+ if (started) {
+ return;
+ }
+ started = true;
+ conf = new SerializableConfiguration(taskContext.getConfiguration());
+ for (OutputReceiver receiver : receivers) {
+ if (receiver == null) {
+ continue;
+ }
+ for (Operation operation : receiver.getReceivingOperations()) {
+ operation.start(taskContext);
+ }
+ }
+ }
+
+ /**
+ * Processes the element.
+ */
+ public abstract void process(WindowedValue<T> elem) throws IOException, InterruptedException;
+
+ /**
+ * Finishes this Operation's execution.
+ *
+ * <p>Called after all predecessors producing operations have been finished.
+ */
+ public void finish() {
+ if (finished) {
+ return;
+ }
+ finished = true;
+ for (OutputReceiver receiver : receivers) {
+ if (receiver == null) {
+ continue;
+ }
+ for (Operation operation : receiver.getReceivingOperations()) {
+ operation.finish();
+ }
+ }
+ }
+
+ public SerializableConfiguration getConf() {
+ return conf;
+ }
+
+ public List<OutputReceiver> getOutputReceivers() {
+ // TODO: avoid allocating objects for each output emit.
+ return ImmutableList.copyOf(receivers);
+ }
+
+ /**
+ * Adds an output to this Operation.
+ */
+ public void attachConsumer(TupleTag<?> tupleTag, Operation consumer) {
+ int outputIndex = getOutputIndex(tupleTag);
+ OutputReceiver fanOut = receivers[outputIndex];
+ fanOut.addOutput(consumer);
+ }
+
+ protected int getOutputIndex(TupleTag<?> tupleTag) {
+ return 0;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
new file mode 100644
index 0000000..b2f1b6d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * OutputReceiver that forwards each input it receives to each of a list of down stream operations.
+ */
+public class OutputReceiver implements Serializable {
+ private final List<Operation> receivingOperations = new ArrayList<>();
+
+ /**
+ * Adds a new receiver that this OutputReceiver forwards to.
+ */
+ public void addOutput(Operation receiver) {
+ receivingOperations.add(receiver);
+ }
+
+ public List<Operation> getReceivingOperations() {
+ return ImmutableList.copyOf(receivingOperations);
+ }
+
+ /**
+ * Processes the element.
+ */
+ public void process(WindowedValue<?> elem) {
+ for (Operation out : receivingOperations) {
+ if (out != null) {
+ try {
+ out.process(elem);
+ } catch (IOException | InterruptedException e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
new file mode 100644
index 0000000..ef83e72
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Operation for ParDo.
+ */
+public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> {
+ private final String stepName;
+ protected final SerializedPipelineOptions options;
+ protected final TupleTag<OutputT> mainOutputTag;
+ private final List<TupleTag<?>> sideOutputTags;
+ protected final WindowingStrategy<?, ?> windowingStrategy;
+ private final List<Graphs.Tag> sideInputTags;
+ private Map<TupleTag<?>, String> tupleTagToFilePath;
+
+ private MetricsReporter metricsReporter;
+ protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+ private DoFnRunner<InputT, OutputT> fnRunner;
+
+ public ParDoOperation(
+ String stepName,
+ PipelineOptions options,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ List<Graphs.Tag> sideInputTags,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ super(1 + sideOutputTags.size());
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.options = new SerializedPipelineOptions(checkNotNull(options, "options"));
+ this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
+ this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
+ this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
+ this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags");
+ }
+
+ /**
+ * Returns a {@link DoFn} for processing inputs.
+ */
+ abstract DoFn<InputT, OutputT> getDoFn();
+
+ @Override
+ public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+ super.start(taskContext);
+ this.metricsReporter = new MetricsReporter(taskContext);
+
+ DoFn<InputT, OutputT> doFn = getDoFn();
+ // Process user's setup
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+
+ Map<TupleTag<?>, Coder<?>> tupleTagToCoder = Maps.newHashMap();
+ for (Graphs.Tag tag : sideInputTags) {
+ tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
+ }
+
+ final StateInternals stateInternals;
+ try {
+ stateInternals = InMemoryStateInternals.forKey(taskContext.getCurrentKey());
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(e);
+ }
+ final TimerInternals timerInternals = new InMemoryTimerInternals();
+
+ fnRunner = DoFnRunners.simpleRunner(
+ options.getPipelineOptions(),
+ getDoFn(),
+ sideInputTags.isEmpty()
+ ? NullSideInputReader.empty() :
+ new FileSideInputReader(tupleTagToFilePath, tupleTagToCoder, getConf().getConf()),
+ createOutputManager(),
+ mainOutputTag,
+ sideOutputTags,
+ new StepContext() {
+ @Override
+ public StateInternals stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+ },
+ windowingStrategy);
+
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ fnRunner.startBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Processes the element.
+ */
+ @Override
+ public void process(WindowedValue<InputT> elem) {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ fnRunner.processElement(elem);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finish() {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ fnRunner.finishBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ metricsReporter.updateMetrics();
+ doFnInvoker.invokeTeardown();
+ super.finish();
+ }
+
+ public void setupSideInput(Map<TupleTag<?>, String> tupleTagToFilePath) {
+ this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, "tupleTagToFilePath");
+ }
+
+ public List<Graphs.Tag> getSideInputTags() {
+ return sideInputTags;
+ }
+
+ @Override
+ protected int getOutputIndex(TupleTag<?> tupleTag) {
+ if (tupleTag == mainOutputTag) {
+ return 0;
+ } else {
+ int sideIndex = sideOutputTags.indexOf(tupleTag);
+ checkState(
+ sideIndex >= 0,
+ String.format("Cannot find index for tuple tag: %s.", tupleTag));
+ return sideIndex + 1;
+ }
+ }
+
+ protected DoFnRunners.OutputManager createOutputManager() {
+ return new ParDoOutputManager();
+ }
+
+ private class ParDoOutputManager implements DoFnRunners.OutputManager {
+
+ @Nullable
+ private OutputReceiver getReceiverOrNull(TupleTag<?> tupleTag) {
+ List<OutputReceiver> receivers = getOutputReceivers();
+ int outputIndex = getOutputIndex(tupleTag);
+ return receivers.get(outputIndex);
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
+ OutputReceiver receiver = getReceiverOrNull(tupleTag);
+ if (receiver != null) {
+ receiver.process(windowedValue);
+ }
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
new file mode 100644
index 0000000..f8f1a02
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates a {@link ParDo} to a {@link Operation}.
+ */
+class ParDoTranslator<InputT, OutputT>
+ extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+ @Override
+ public void translateNode(
+ ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ NormalParDoOperation operation = new NormalParDoOperation(
+ userGraphContext.getStepName(),
+ transform.getFn(),
+ userGraphContext.getOptions(),
+ transform.getMainOutputTag(),
+ transform.getAdditionalOutputTags().getAll(),
+ userGraphContext.getSideInputTags(),
+ ((PCollection) userGraphContext.getInput()).getWindowingStrategy());
+ context.addInitStep(
+ Graphs.Step.of(userGraphContext.getStepName(), operation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
new file mode 100644
index 0000000..dc0f81a
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Operation that partitions input elements based on their {@link TupleTag} keys.
+ */
+public class PartitionOperation extends Operation<KV<TupleTag<?>, Object>> {
+
+ private final List<ReadOperation> readOperations;
+ private final List<TupleTag<?>> tupleTags;
+
+ public PartitionOperation(List<ReadOperation> readOperations, List<TupleTag<?>> tupleTags) {
+ super(readOperations.size());
+ this.readOperations = checkNotNull(readOperations, "readOperations");
+ this.tupleTags = checkNotNull(tupleTags, "tupleTags");
+ }
+
+ public List<ReadOperation> getReadOperations() {
+ return readOperations;
+ }
+
+ @Override
+ public void process(WindowedValue<KV<TupleTag<?>, Object>> elem) throws IOException,
+ InterruptedException {
+ TupleTag<?> tupleTag = elem.getValue().getKey();
+ int outputIndex = getOutputIndex(tupleTag);
+ OutputReceiver receiver = getOutputReceivers().get(outputIndex);
+ receiver.process((WindowedValue<?>) elem.getValue().getValue());
+ }
+
+ @Override
+ protected int getOutputIndex(TupleTag<?> tupleTag) {
+ int index = tupleTags.indexOf(tupleTag);
+ checkState(
+ index >= 0,
+ String.format("Cannot find index for tuple tag: %s.", tupleTag));
+ return index;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
new file mode 100644
index 0000000..5e5c99b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.io.Read;
+
+/**
+ * Translates a {@link Read.Bounded} to a {@link ReadOperation}.
+ */
+class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+ @Override
+ public void translateNode(Read.Bounded transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ String stepName = userGraphContext.getStepName();
+ ReadOperation operation = new SourceReadOperation(
+ stepName, transform.getSource(), userGraphContext.getOnlyOutputTag());
+ context.addInitStep(
+ Graphs.Step.of(stepName, operation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
new file mode 100644
index 0000000..a3e1d77
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Read.Bounded place holder {@link Operation} during pipeline translation.
+ */
+abstract class ReadOperation<T> extends Operation<T> {
+
+ public ReadOperation() {
+ super(1);
+ }
+
+ @Override
+ public void process(WindowedValue elem) {
+ throw new IllegalStateException(
+ String.format("%s should not in execution graph.", this.getClass().getSimpleName()));
+ }
+
+ /**
+ * Returns a TaggedSource during pipeline construction time.
+ */
+ abstract TaggedSource getTaggedSource(Configuration conf);
+
+ @AutoValue
+ abstract static class TaggedSource implements Serializable {
+ abstract String getStepName();
+ abstract BoundedSource<?> getSource();
+ abstract TupleTag<?> getTag();
+
+ static TaggedSource of(String stepName, BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
+ return new org.apache.beam.runners.mapreduce.translation
+ .AutoValue_ReadOperation_TaggedSource(stepName, boundedSource, tupleTag);
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
new file mode 100644
index 0000000..0e02bbb
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * {@link Operation} that executes {@link ReifyTimestampAndWindowsDoFn}.
+ */
+public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation {
+
+ public ReifyTimestampAndWindowsParDoOperation(
+ String stepName,
+ PipelineOptions options,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Graphs.Tag outTag) {
+ super(stepName, options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+ ImmutableList.<Graphs.Tag>of(), windowingStrategy);
+ }
+
+ @Override
+ DoFn<Object, Object> getDoFn() {
+ return (DoFn) new ReifyTimestampAndWindowsDoFn<>();
+ }
+
+ private static class ReifyTimestampAndWindowsDoFn<K, V>
+ extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ KV<K, V> kv = c.element();
+ K key = kv.getKey();
+ V value = kv.getValue();
+ c.output(KV.of(
+ key,
+ WindowedValue.of(
+ value,
+ c.timestamp(),
+ window,
+ c.pane())));
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
new file mode 100644
index 0000000..7af595c
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@link Serializable} {@link Configuration}.
+ */
+class SerializableConfiguration implements Serializable {
+
+ private transient Configuration conf;
+
+ SerializableConfiguration(Configuration conf) {
+ this.conf = checkNotNull(conf, "conf");
+ }
+
+ Configuration getConf() {
+ return conf;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ conf.write(out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ conf = new Configuration();
+ conf.readFields(in);
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
new file mode 100644
index 0000000..5c37b7c
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+ private final byte[] serializedOptions;
+
+ /** Lazily initialized copy of deserialized options. */
+ private transient PipelineOptions pipelineOptions;
+
+ public SerializedPipelineOptions(PipelineOptions options) {
+ checkNotNull(options, "PipelineOptions must not be null.");
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ createMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ if (pipelineOptions == null) {
+ try {
+ pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
+
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
+
+ return pipelineOptions;
+ }
+
+ /**
+ * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+ * for user specified configuration injection into the ObjectMapper. This supports user custom
+ * types on {@link PipelineOptions}.
+ */
+ private static ObjectMapper createMapper() {
+ return new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
new file mode 100644
index 0000000..a8fae1b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * {@link Operation} that materializes input for group by key.
+ */
+public class ShuffleWriteOperation<T> extends Operation<T> {
+
+ private final Coder<Object> keyCoder;
+ private final Coder<Object> valueCoder;
+
+ private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+
+ public ShuffleWriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) {
+ super(0);
+ this.keyCoder = checkNotNull(keyCoder, "keyCoder");
+ this.valueCoder = checkNotNull(valueCoder, "valueCoder");
+ }
+
+ @Override
+ public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+ this.taskContext = checkNotNull(taskContext, "taskContext");
+ }
+
+ @Override
+ public void process(WindowedValue<T> elem) throws IOException, InterruptedException {
+ KV<?, ?> kv = (KV<?, ?>) elem.getValue();
+ ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
+ keyCoder.encode(kv.getKey(), keyStream);
+
+ ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+ valueCoder.encode(kv.getValue(), valueStream);
+ taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
new file mode 100644
index 0000000..55b46a4
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Operation that reads from {@link BoundedSource}.
+ */
+public class SourceReadOperation extends ReadOperation {
+ private final String stepName;
+ private final TaggedSource source;
+
+ SourceReadOperation(String stepName, BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ checkNotNull(boundedSource, "boundedSource");
+ checkNotNull(tupleTag, "tupleTag");
+ this.source = TaggedSource.of(stepName, boundedSource, tupleTag);
+ }
+
+ @Override
+ TaggedSource getTaggedSource(Configuration conf) {
+ return source;
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
new file mode 100644
index 0000000..f495372
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> {
+
+ void translateNode(T transform, TranslationContext context);
+
+ /**
+ * Returns true if this translator can translate the given transform.
+ */
+ boolean canTranslate(T transform, TranslationContext context);
+
+ /**
+ * Default translator.
+ */
+ class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+ @Override
+ public void translateNode(T1 transform, TranslationContext context) {
+
+ }
+
+ @Override
+ public boolean canTranslate(T1 transform, TranslationContext context) {
+ return true;
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
new file mode 100644
index 0000000..e908e93
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Class that maintains contexts during translation.
+ */
+public class TranslationContext {
+
+ private final UserGraphContext userGraphContext;
+ private final Graph<Graphs.Step, Graphs.Tag> initGraph;
+
+ public TranslationContext(MapReducePipelineOptions options) {
+ this.userGraphContext = new UserGraphContext(options);
+ this.initGraph = new Graph<>();
+ }
+
+ public UserGraphContext getUserGraphContext() {
+ return userGraphContext;
+ }
+
+ public void addInitStep(Graphs.Step step, List<Graphs.Tag> inTags, List<Graphs.Tag> outTags) {
+ initGraph.addStep(step, inTags, outTags);
+ }
+
+ /**
+ * Returns {@link Graphs.Step steps} in reverse topological order.
+ */
+ public Graph<Graphs.Step, Graphs.Tag> getInitGraph() {
+ return initGraph;
+ }
+
+ /**
+ * Context of user graph.
+ */
+ public static class UserGraphContext {
+ private final MapReducePipelineOptions options;
+ private final Map<PValue, TupleTag<?>> pValueToTupleTag;
+ private TransformHierarchy.Node currentNode;
+
+ public UserGraphContext(MapReducePipelineOptions options) {
+ this.options = checkNotNull(options, "options");
+ this.pValueToTupleTag = Maps.newHashMap();
+ this.currentNode = null;
+ }
+
+ public MapReducePipelineOptions getOptions() {
+ return options;
+ }
+
+ public void setCurrentNode(TransformHierarchy.Node node) {
+ this.currentNode = node;
+ for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet()) {
+ pValueToTupleTag.put(entry.getValue(), entry.getKey());
+ // TODO: this is a hack to get around that ViewAsXYZ.expand() return wrong output PValue.
+ if (node.getTransform() instanceof View.CreatePCollectionView) {
+ View.CreatePCollectionView view = (View.CreatePCollectionView) node.getTransform();
+ pValueToTupleTag.put(view.getView(), view.getView().getTagInternal());
+ }
+ }
+ }
+
+ public String getStepName() {
+ return currentNode.getFullName();
+ }
+
+ public PValue getInput() {
+ return Iterables.get(currentNode.getInputs().values(), 0);
+ }
+
+ public PValue getOutput() {
+ return Iterables.get(currentNode.getOutputs().values(), 0);
+ }
+
+ public List<Graphs.Tag> getInputTags() {
+ Iterable<PValue> inputs;
+ if (currentNode.getTransform() instanceof ParDo.MultiOutput) {
+ ParDo.MultiOutput parDo = (ParDo.MultiOutput) currentNode.getTransform();
+ inputs = ImmutableList.<PValue>builder()
+ .add(getInput()).addAll(parDo.getSideInputs()).build();
+ } else {
+ inputs = currentNode.getInputs().values();
+ }
+ return FluentIterable.from(inputs)
+ .transform(new Function<PValue, Graphs.Tag>() {
+ @Override
+ public Graphs.Tag apply(PValue pValue) {
+ checkState(
+ pValueToTupleTag.containsKey(pValue),
+ String.format("Failed to find TupleTag for pValue: %s.", pValue));
+ if (pValue instanceof PCollection) {
+ PCollection<?> pc = (PCollection<?>) pValue;
+ return Graphs.Tag.of(
+ pc.getName(),
+ pValueToTupleTag.get(pValue),
+ pc.getCoder(),
+ pc.getWindowingStrategy());
+ } else if (pValue instanceof PCollectionView){
+ PCollectionView pView = (PCollectionView) pValue;
+ return Graphs.Tag.of(
+ pValue.getName(),
+ pValueToTupleTag.get(pValue),
+ pView.getCoderInternal(),
+ pView.getWindowingStrategyInternal());
+ } else {
+ throw new RuntimeException("Unexpected PValue: " + pValue.getClass());
+ }
+ }})
+ .toList();
+ }
+
+ public List<Graphs.Tag> getSideInputTags() {
+ if (!(currentNode.getTransform() instanceof ParDo.MultiOutput)) {
+ return ImmutableList.of();
+ }
+ return FluentIterable.from(((ParDo.MultiOutput) currentNode.getTransform()).getSideInputs())
+ .transform(new Function<PValue, Graphs.Tag>() {
+ @Override
+ public Graphs.Tag apply(PValue pValue) {
+ checkState(
+ pValueToTupleTag.containsKey(pValue),
+ String.format("Failed to find TupleTag for pValue: %s.", pValue));
+ if (pValue instanceof PCollection) {
+ PCollection<?> pc = (PCollection<?>) pValue;
+ return Graphs.Tag.of(
+ pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(),
+ pc.getWindowingStrategy());
+ } else if (pValue instanceof PCollectionView){
+ PCollectionView pView = (PCollectionView) pValue;
+ return Graphs.Tag.of(
+ pValue.getName(),
+ pValueToTupleTag.get(pValue),
+ pView.getCoderInternal(),
+ pView.getWindowingStrategyInternal());
+ } else {
+ throw new RuntimeException("Unexpected PValue: " + pValue.getClass());
+ }
+ }})
+ .toList();
+ }
+
+ public List<Graphs.Tag> getOutputTags() {
+ if (currentNode.getTransform() instanceof View.CreatePCollectionView) {
+ PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView();
+ return ImmutableList.of(
+ Graphs.Tag.of(view.getName(), view.getTagInternal(), view.getCoderInternal(),
+ view.getWindowingStrategyInternal()));
+ } else {
+ return FluentIterable.from(currentNode.getOutputs().entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() {
+ @Override
+ public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
+ PCollection<?> pc = (PCollection<?>) entry.getValue();
+ return Graphs.Tag.of(
+ pc.getName(), entry.getKey(), pc.getCoder(), pc.getWindowingStrategy());
+ }})
+ .toList();
+ }
+ }
+
+ public TupleTag<?> getOnlyOutputTag() {
+ return Iterables.getOnlyElement(currentNode.getOutputs().keySet());
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..e51d392
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+ private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =
+ new HashMap<>();
+
+ static {
+ TRANSLATORS.put(Read.Bounded.class, new ReadBoundedTranslator());
+ TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslator());
+ TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+ TRANSLATORS.put(View.CreatePCollectionView.class, new ViewTranslator());
+ }
+
+ public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ return TRANSLATORS.get(transform.getClass());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
new file mode 100644
index 0000000..d018345
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link View.CreatePCollectionView} to a {@link FileWriteOperation}.
+ */
+public class ViewTranslator extends TransformTranslator.Default<View.CreatePCollectionView<?, ?>> {
+
+ @Override
+ public void translateNode(
+ View.CreatePCollectionView<?, ?> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ PCollection<?> inPCollection = transform.getView().getPCollection();
+ WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy();
+
+ Graphs.Tag outTag = Iterables.getOnlyElement(userGraphContext.getOutputTags());
+ String fileName = ConfigurationUtils.toFileName(outTag.getName());
+
+ FileWriteOperation<?> operation = new FileWriteOperation<>(
+ fileName,
+ WindowedValue.getFullCoder(
+ inPCollection.getCoder(), windowingStrategy.getWindowFn().windowCoder()));
+ context.addInitStep(
+ Graphs.Step.of(userGraphContext.getStepName(), operation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
new file mode 100644
index 0000000..3279e11
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * {@link Operation} that executes for assigning windows to elements.
+ */
+public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation<T> {
+ private final WindowFn<T, W> windowFn;
+
+ public WindowAssignOperation(WindowFn<T, W> windowFn) {
+ super(1);
+ this.windowFn = checkNotNull(windowFn, "windowFn");
+ }
+
+ @Override
+ public void process(WindowedValue<T> elem) {
+ try {
+ Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, elem));
+ for (W window : windows) {
+ OutputReceiver receiver = Iterables.getOnlyElement(getOutputReceivers());
+ receiver.process(WindowedValue.of(
+ elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+ }
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class AssignContextInternal<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ AssignContextInternal(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ checkArgument(
+ Iterables.size(value.getWindows()) == 1,
+ String.format(
+ "%s passed to window assignment must be in a single window, but it was in %s: %s",
+ WindowedValue.class.getSimpleName(),
+ Iterables.size(value.getWindows()),
+ value.getWindows()));
+ this.value = value;
+ }
+
+ @Override
+ public InputT element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..3908870
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Translates a {@link Window.Assign} to a {@link WindowAssignOperation}.
+ */
+public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+ @Override
+ public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ WindowAssignOperation<T, ?> operation = new WindowAssignOperation<>(transform.getWindowFn());
+ context.addInitStep(
+ Graphs.Step.of(userGraphContext.getStepName(), operation),
+ userGraphContext.getInputTags(),
+ userGraphContext.getOutputTags());
+ }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
new file mode 100644
index 0000000..c9360ac
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for translating Beam pipelines to MapReduce jobs.
+ */
+package org.apache.beam.runners.mapreduce.translation;
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
new file mode 100644
index 0000000..76c8311
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GraphConverter}.
+ */
+@RunWith(JUnit4.class)
+public class GraphConverterTest {
+
+ @Test
+ public void testCombine() throws Exception {
+ MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+ options.setRunner(CrashingRunner.class);
+ Pipeline p = Pipeline.create(options);
+ PCollection<KV<String, Integer>> input = p
+ .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+ .apply(Sum.<String>integersPerKey());
+ TranslationContext context = new TranslationContext(options);
+ GraphConverter graphConverter = new GraphConverter(context);
+ p.traverseTopologically(graphConverter);
+
+ Graph<Graphs.Step, Graphs.Tag> initGraph = context.getInitGraph();
+
+ assertEquals(3, Iterables.size(initGraph.getSteps()));
+ }
+}
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
new file mode 100644
index 0000000..fca6131
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GraphPlanner}.
+ */
+@RunWith(JUnit4.class)
+public class GraphPlannerTest {
+
+ @Test
+ public void testCombine() throws Exception {
+ MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+ options.setRunner(CrashingRunner.class);
+ Pipeline p = Pipeline.create(options);
+ PCollection<KV<String, Integer>> input = p
+ .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+ .apply(Sum.<String>integersPerKey());
+
+ TranslationContext context = new TranslationContext(options);
+ GraphConverter graphConverter = new GraphConverter(context);
+ p.traverseTopologically(graphConverter);
+
+ GraphPlanner planner = new GraphPlanner(options);
+ Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph());
+ fusedGraph = planner.plan(fusedGraph);
+
+ assertEquals(1, Iterables.size(fusedGraph.getFusedSteps()));
+ assertEquals(3, Iterables.getOnlyElement(fusedGraph.getFusedSteps()).getSteps().size());
+ }
+}
diff --git a/runners/pom.xml b/runners/pom.xml
index 4412ed6..39a9811 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>core-construction-java</module>
<module>core-java</module>
+ <module>map-reduce</module>
<module>direct-java</module>
<module>flink</module>
<module>google-cloud-dataflow-java</module>
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 3144193..75b2043 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -37,7 +37,8 @@
and other project configuration to be used in all modules.
<module>build-tools</module> -->
<module>core</module>
- <module>io</module>
+ <!--module>io</module-->
+ <module>io/hadoop-file-system</module>
<module>maven-archetypes</module>
<module>extensions</module>
<!-- javadoc runs directly from the root parent as the last module
diff --git a/sdks/pom.xml b/sdks/pom.xml
index aec8762..c06f764 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -35,7 +35,7 @@
<modules>
<module>common</module>
<module>java</module>
- <module>python</module>
+ <!--<module>python</module>-->
</modules>
<profiles>