This closes #3705: [BEAM-165] Initial implementation of the MapReduce runner

  mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed comments.
  mr-runner-hack: disable unrelated modules to shorten build time during development.
  mr-runner: support SourceMetrics, this fixes MetricsTest.testBoundedSourceMetrics().
  mr-runner: introduces duplicateFactor in FlattenOperation, this fixes testFlattenInputMultipleCopies().
  mr-runner: translate empty flatten into EmptySource, this fixes few empty FalttenTests.
  mr-runner: ensure Operation only start/finish once for diamond shaped DAG, this fixes ParDoLifecycleTest.
  mr-runner: Graph.getSteps() to return with topological order, this fixes few CombineTests.
  mr-runner: fail early in the runner when MapReduce job fails.
  mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest that uses state.
  mr-runner: use the correct step name in ParDoTranslator, this fixes MetricsTest.testAttemptedCounterMetrics().
  mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.
  mr-runner: handle no files case in FileSideInputReader for empty views.
  mr-runner: fix NPE in PipelineTest.testIdentityTransform().
  mr-runner: filter out unsupported features in ValidatesRunner tests.
  mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.
  mr-runner: fix the bug that steps are attached multiple times in diamond shaped DAG.
  [BEAM-2783] support metrics in MapReduceRunner.
  mr-runner: setup file paths for read and write sides of materialization.
  mr-runner: support side inputs by reading in all views contents.
  mr-runner: support multiple SourceOperations by composing and partitioning.
  mr-runner: support PCollections materialization with multiple MR jobs.
  mr-runner: hack to get around that ViewAsXXX.expand() return wrong output PValue.
  mr-runner: support graph visualization with dotfiles.
  mr-runner: refactors and creates Graph data structures to handle general Beam pipelines.
  mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.
  mr-runner: support reduce side ParDos and WordCount.
  core-java: InMemoryTimerInternals expose getTimers() for timer firings in mr-runner.
  mr-runner: add BeamReducer and support GroupByKey.
  mr-runner: add ParDoOperation and support ParDos chaining.
  mr-runner: add JobPrototype and translate it to a MR job.
  mr-runner: support BoundedSource with BeamInputFormat.
  MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.
  MapReduceRunner: add Graph and its visitors.
  Initial commit for MapReduceRunner.
diff --git a/pom.xml b/pom.xml
index 80ab6e2..ae86a9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
     <module>sdks/java/build-tools</module>
     <module>sdks</module>
     <module>runners</module>
-    <module>examples</module>
+    <!--<module>examples</module>-->
     <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
     <module>sdks/java/javadoc</module>
   </modules>
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index c29ea19..c1d42d6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -23,6 +23,7 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Table;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -65,6 +66,10 @@
     return outputWatermarkTime;
   }
 
+  public Iterable<TimerData> getTimers(TimeDomain domain) {
+    return ImmutableList.copyOf(timersForDomain(domain));
+  }
+
   /**
    * Returns when the next timer in the given time domain will fire, or {@code null}
    * if there are no timers scheduled in that time domain.
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
new file mode 100644
index 0000000..90d876b
--- /dev/null
+++ b/runners/map-reduce/pom.xml
@@ -0,0 +1,197 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  
+  <artifactId>beam-runners-mapreduce</artifactId>
+
+  <name>Apache Beam :: Runners :: MapReduce</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <mapreduce.version>2.8.1</mapreduce.version>
+  </properties>
+  
+  <profiles>
+    <profile>
+      <!-- This profile adds execution of ValidatesRunner integration tests
+           against a hadoop local cluster. -->
+      <id>local-validates-runner-tests</id>
+      <activation><activeByDefault>true</activeByDefault></activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>validates-runner-tests</id>
+                <phase>integration-test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <groups>
+                    org.apache.beam.sdk.testing.ValidatesRunner
+                  </groups>
+                  <excludes>
+                    <exclude>org.apache.beam.sdk.testing.PAssertTest.java</exclude>
+                  </excludes>
+                  <excludedGroups>
+                    org.apache.beam.sdk.testing.UsesSetState,
+                    org.apache.beam.sdk.testing.UsesSplittableParDo,
+                    org.apache.beam.sdk.testing.UsesDistributionMetrics,
+                    org.apache.beam.sdk.testing.UsesGaugeMetrics,
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                    org.apache.beam.sdk.testing.LargeKeys$Above10MB,
+                    org.apache.beam.sdk.testing.UsesTimersInParDo,
+                    org.apache.beam.sdk.testing.UsesStatefulParDo,
+                    org.apache.beam.sdk.testing.UsesTimersInParDo,
+                    org.apache.beam.sdk.testing.UsesTestStream
+                  </excludedGroups>
+                  <parallel>none</parallel>
+                  <failIfNoTests>true</failIfNoTests>
+                  <dependenciesToScan>
+                    <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+                  </dependenciesToScan>
+                  <systemPropertyVariables>
+                    <beamTestPipelineOptions>
+                      [
+                        "--runner=org.apache.beam.runners.mapreduce.MapReduceRunner"
+                      ]
+                    </beamTestPipelineOptions>
+                  </systemPropertyVariables>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <dependencies>
+    <!-- MapRecue dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${mapreduce.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${mapreduce.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${mapreduce.version}</version>
+    </dependency>
+
+    <!-- Beam dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
+    </dependency>
+
+    <!-- Module dependencies -->    
+    <dependency>
+        <groupId>com.google.auto.service</groupId>
+        <artifactId>auto-service</artifactId>
+        <optional>true</optional>
+    </dependency>
+    <dependency>
+        <groupId>com.google.auto.value</groupId>
+        <artifactId>auto-value</artifactId>
+    </dependency>
+
+    <!-- Depend on test jar to scan for ValidatesRunner tests -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- transitive test dependencies from beam-sdk-java-core -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+        <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-dependency-plugin</artifactId>
+          <executions>
+            <execution>
+              <goals><goal>analyze-only</goal></goals>
+              <configuration>
+                <!-- disable for now during runner development -->
+                <failOnWarning>false</failOnWarning>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+  </build>
+</project>
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
new file mode 100644
index 0000000..7cff40d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * {@link PipelineOptions} for {@link MapReduceRunner}.
+ */
+public interface MapReducePipelineOptions extends PipelineOptions {
+
+  /** Classes that are used as the boundary in the stack trace to find the callers class name. */
+  Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of(
+      PipelineOptionsFactory.class.getName(),
+      PipelineOptionsFactory.Builder.class.getName(),
+      "org.apache.beam.sdk.options.ProxyInvocationHandler");
+
+  @Description("The jar class of the user Beam program.")
+  @Default.InstanceFactory(JarClassInstanceFactory.class)
+  Class<?> getJarClass();
+  void setJarClass(Class<?> jarClass);
+
+  @Description("The directory for files output.")
+  @Default.String("/tmp/mapreduce/")
+  String getFileOutputDir();
+  void setFileOutputDir(String fileOutputDir);
+
+  /**
+   * Returns the {@link Class} that constructs MapReduce job through Beam.
+   */
+  class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> {
+    @Override
+    public Class<?> create(PipelineOptions options) {
+      return findCallersClassName(options);
+    }
+
+    /**
+     * Returns the simple name of the calling class using the current threads stack.
+     */
+    private static Class<?> findCallersClassName(PipelineOptions options) {
+      Iterator<StackTraceElement> elements =
+          Iterators.forArray(Thread.currentThread().getStackTrace());
+      // First find the PipelineOptionsFactory/Builder class in the stack trace.
+      while (elements.hasNext()) {
+        StackTraceElement next = elements.next();
+        if (PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())) {
+          break;
+        }
+      }
+      // Then find the first instance after that is not the PipelineOptionsFactory/Builder class.
+      while (elements.hasNext()) {
+        StackTraceElement next = elements.next();
+        if (!PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())
+            && !next.getClassName().contains("com.sun.proxy.$Proxy")
+            && !next.getClassName().equals(options.getRunner().getName())) {
+          try {
+            return Class.forName(next.getClassName());
+          } catch (ClassNotFoundException e) {
+            break;
+          }
+        }
+      }
+      return null;
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
new file mode 100644
index 0000000..933d8f6
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.mapreduce.translation.MapReduceMetricResults;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.Duration;
+
+/**
+ * A {@link PipelineResult} of executing {@link org.apache.beam.sdk.Pipeline Pipelines} using
+ * {@link MapReduceRunner}.
+ *
+ * <p>It is synchronous (returned after the pipeline is finished), and is used for querying metrics.
+ */
+public class MapReducePipelineResult implements PipelineResult {
+
+  private final List<Job> jobs;
+  public MapReducePipelineResult(List<Job> jobs) {
+    this.jobs = checkNotNull(jobs, "jobs");
+  }
+
+  @Override
+  public State getState() {
+    return State.DONE;
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return State.DONE;
+  }
+
+  @Override
+  public MetricResults metrics() {
+    return new MapReduceMetricResults(jobs);
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
new file mode 100644
index 0000000..1029218
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Registrars for {@link MapReduceRunner}.
+ */
+public class MapReduceRegistrar {
+  private MapReduceRegistrar() {
+  }
+
+  /**
+   * Registers the {@link MapReduceRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>> of(MapReduceRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link MapReducePipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>> of(MapReducePipelineOptions.class);
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
new file mode 100644
index 0000000..85b7d1b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.mapreduce.translation.DotfileWriter;
+import org.apache.beam.runners.mapreduce.translation.GraphConverter;
+import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
+import org.apache.beam.runners.mapreduce.translation.Graphs;
+import org.apache.beam.runners.mapreduce.translation.JobPrototype;
+import org.apache.beam.runners.mapreduce.translation.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.log4j.BasicConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PipelineRunner} for Hadoop MapReduce.
+ *
+ * <p>It translates a Beam {@link Pipeline} to a series of MapReduce {@link Job jobs}, and executes
+ * them locally or on a Hadoop cluster.
+ */
+public class MapReduceRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MapReduceRunner.class);
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static MapReduceRunner fromOptions(PipelineOptions options) {
+    return new MapReduceRunner(options.as(MapReducePipelineOptions.class));
+  }
+
+  private final MapReducePipelineOptions options;
+
+  MapReduceRunner(MapReducePipelineOptions options) {
+    this.options = checkNotNull(options, "options");
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    BasicConfigurator.configure();
+    MetricsEnvironment.setMetricsSupported(true);
+
+    TranslationContext context = new TranslationContext(options);
+    GraphConverter graphConverter = new GraphConverter(context);
+    pipeline.traverseTopologically(graphConverter);
+
+    LOG.info(graphConverter.getDotfile());
+
+    Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph());
+    LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
+    GraphPlanner planner = new GraphPlanner(options);
+    fusedGraph = planner.plan(fusedGraph);
+
+    LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
+    fusedGraph.getFusedSteps();
+
+    List<Job> jobs = new ArrayList<>();
+    int stageId = 0;
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      Configuration config = new Configuration();
+      config.set("keep.failed.task.files", "true");
+
+      JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, options);
+      LOG.info("Running job-{}.", stageId);
+      LOG.info(DotfileWriter.toDotfile(fusedStep));
+      try {
+        Job job = jobPrototype.build(options.getJarClass(), config);
+        job.waitForCompletion(true);
+        if (!job.getStatus().getState().equals(JobStatus.State.SUCCEEDED)) {
+          throw new RuntimeException("MapReduce job failed: " + job.getJobID());
+        }
+        jobs.add(job);
+      } catch (Exception e) {
+        Throwables.throwIfUnchecked(e);
+        throw new RuntimeException(e);
+      }
+    }
+    return new MapReducePipelineResult(jobs);
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
new file mode 100644
index 0000000..e452d92
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementation of the Beam runner for Apache Hadoop MapReduce.
+ */
+package org.apache.beam.runners.mapreduce;
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
new file mode 100644
index 0000000..3d0b8ea
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Adaptor from Beam {@link BoundedSource} to MapReduce {@link InputFormat}.
+ */
+public class BeamInputFormat<T> extends InputFormat {
+
+  public static final String BEAM_SERIALIZED_BOUNDED_SOURCE = "beam-serialized-bounded-source";
+  public static final String BEAM_SERIALIZED_PIPELINE_OPTIONS = "beam-serialized-pipeline-options";
+
+  private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000;
+
+  private List<ReadOperation.TaggedSource> sources;
+  private SerializedPipelineOptions options;
+
+  public BeamInputFormat() {
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    String serializedBoundedSource = context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE);
+    String serializedPipelineOptions =
+        context.getConfiguration().get(BEAM_SERIALIZED_PIPELINE_OPTIONS);
+    if (Strings.isNullOrEmpty(serializedBoundedSource)
+        || Strings.isNullOrEmpty(serializedPipelineOptions)) {
+      return ImmutableList.of();
+    }
+    sources = (List<ReadOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
+    options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions"));
+
+    try {
+
+      return FluentIterable.from(sources)
+          .transformAndConcat(
+              new Function<ReadOperation.TaggedSource, Iterable<ReadOperation.TaggedSource>>() {
+                @Override
+                public Iterable<ReadOperation.TaggedSource> apply(
+                    final ReadOperation.TaggedSource taggedSource) {
+                  try {
+                    return FluentIterable.from(taggedSource.getSource().split(
+                        DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions()))
+                        .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() {
+                          @Override
+                          public ReadOperation.TaggedSource apply(BoundedSource<?> input) {
+                            return ReadOperation.TaggedSource.of(
+                                taggedSource.getStepName(), input, taggedSource.getTag());
+                          }});
+                  } catch (Exception e) {
+                    Throwables.throwIfUnchecked(e);
+                    throw new RuntimeException(e);
+                  }
+                }
+              })
+          .transform(new Function<ReadOperation.TaggedSource, InputSplit>() {
+            @Override
+            public InputSplit apply(ReadOperation.TaggedSource taggedSource) {
+              return new BeamInputSplit(taggedSource.getStepName(), taggedSource.getSource(),
+                  options, taggedSource.getTag());
+            }})
+          .toList();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public RecordReader createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    return ((BeamInputSplit) split).createReader();
+  }
+
+  private static class BeamInputSplit<T> extends InputSplit implements Writable {
+    private String stepName;
+    private BoundedSource<T> boundedSource;
+    private SerializedPipelineOptions options;
+    private TupleTag<?> tupleTag;
+
+    public BeamInputSplit() {
+    }
+
+    public BeamInputSplit(
+        String stepName,
+        BoundedSource<T> boundedSource,
+        SerializedPipelineOptions options,
+        TupleTag<?> tupleTag) {
+      this.stepName = checkNotNull(stepName, "stepName");
+      this.boundedSource = checkNotNull(boundedSource, "boundedSources");
+      this.options = checkNotNull(options, "options");
+      this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+    }
+
+    public BeamRecordReader<T> createReader() throws IOException {
+      return new BeamRecordReader<>(
+          stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag);
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      try {
+        return boundedSource.getEstimatedSizeBytes(options.getPipelineOptions());
+      } catch (Exception e) {
+        Throwables.throwIfUnchecked(e);
+        Throwables.throwIfInstanceOf(e, IOException.class);
+        Throwables.throwIfInstanceOf(e, InterruptedException.class);
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return new String[0];
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      ByteArrayOutputStream stream = new ByteArrayOutputStream();
+      StringUtf8Coder.of().encode(stepName, stream);
+      SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
+      SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream);
+      SerializableCoder.of(TupleTag.class).encode(tupleTag, stream);
+
+      byte[] bytes = stream.toByteArray();
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+
+      ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
+      stepName = StringUtf8Coder.of().decode(inStream);
+      boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream);
+      options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
+      tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream);
+    }
+  }
+
+  private static class BeamRecordReader<T> extends RecordReader {
+
+    private final String stepName;
+    private final BoundedSource.BoundedReader<T> reader;
+    private final TupleTag<?> tupleTag;
+    private MetricsReporter metricsReporter;
+    private boolean started;
+
+    public BeamRecordReader(
+        String stepName, BoundedSource.BoundedReader<T> reader, TupleTag<?> tupleTag) {
+      this.stepName = checkNotNull(stepName, "stepName");
+      this.reader = checkNotNull(reader, "reader");
+      this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+      this.started = false;
+    }
+
+    @Override
+    public void initialize(
+        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+      this.metricsReporter = new MetricsReporter(context);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+          metricsReporter.getMetricsContainer(stepName))) {
+        if (!started) {
+          started = true;
+          return reader.start();
+        } else {
+          return reader.advance();
+        }
+      }
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException, InterruptedException {
+      return tupleTag;
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException, InterruptedException {
+      // TODO: this is a hack to handle that reads from materialized PCollections
+      // already return WindowedValue.
+      if (reader.getCurrent() instanceof WindowedValue) {
+        return reader.getCurrent();
+      } else {
+        return WindowedValue.timestampedValueInGlobalWindow(
+            reader.getCurrent(), reader.getCurrentTimestamp());
+      }
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      Double progress = reader.getFractionConsumed();
+      if (progress != null) {
+        return progress.floatValue();
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+      metricsReporter.updateMetrics();
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
new file mode 100644
index 0000000..46c74c0
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adapter for executing {@link Operation operations} in {@link Mapper}.
+ */
+public class BeamMapper<ValueInT, ValueOutT>
+    extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(Mapper.class);
+
+  public static final String BEAM_PAR_DO_OPERATION_MAPPER = "beam-par-do-op-mapper";
+
+  private Operation operation;
+
+  @Override
+  protected void setup(
+      Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) {
+    String serializedParDo = checkNotNull(
+        context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER),
+        BEAM_PAR_DO_OPERATION_MAPPER);
+    operation = (Operation) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedParDo), "Operation");
+    operation.start((TaskInputOutputContext) context);
+  }
+
+  @Override
+  protected void map(
+      Object key,
+      WindowedValue<ValueInT> value,
+      Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context)
+      throws IOException, InterruptedException {
+    LOG.info("key: {} value: {}.", key, value);
+    // Only needs to pass KV to the following PartitionOperation. However, we have to wrap it in a
+    // global window because of the method signature.
+    operation.process(WindowedValue.valueInGlobalWindow(KV.of(key, value)));
+  }
+
+  @Override
+  protected void cleanup(
+      Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) {
+    operation.finish();
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
new file mode 100644
index 0000000..b69be32
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adapter for executing {@link Operation operations} in {@link Reducer}.
+ */
+public class BeamReducer<ValueInT, ValueOutT>
+    extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
+
+  public static final String BEAM_REDUCER_KV_CODER = "beam-reducer-kv-coder";
+  public static final String BEAM_PAR_DO_OPERATION_REDUCER = "beam-par-do-op-reducer";
+
+  private Coder<Object> keyCoder;
+  private Coder<Object> valueCoder;
+  private Operation operation;
+
+  @Override
+  protected void setup(
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) {
+    String serializedValueCoder = checkNotNull(
+        context.getConfiguration().get(BEAM_REDUCER_KV_CODER),
+        BEAM_REDUCER_KV_CODER);
+    String serializedParDo = checkNotNull(
+        context.getConfiguration().get(BEAM_PAR_DO_OPERATION_REDUCER),
+        BEAM_PAR_DO_OPERATION_REDUCER);
+    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) SerializableUtils
+        .deserializeFromByteArray(Base64.decodeBase64(serializedValueCoder), "Coder");
+    keyCoder = kvCoder.getKeyCoder();
+    valueCoder = kvCoder.getValueCoder();
+    operation = (Operation) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedParDo), "Operation");
+    operation.start((TaskInputOutputContext) context);
+  }
+
+  @Override
+  protected void reduce(
+      BytesWritable key,
+      Iterable<byte[]> values,
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context)
+      throws InterruptedException, IOException {
+    List<Object> decodedValues = Lists.newArrayList(FluentIterable.from(values)
+        .transform(new Function<byte[], Object>() {
+          @Override
+          public Object apply(byte[] input) {
+            ByteArrayInputStream inStream = new ByteArrayInputStream(input);
+            try {
+              return valueCoder.decode(inStream);
+            } catch (IOException e) {
+              Throwables.throwIfUnchecked(e);
+              throw new RuntimeException(e);
+            }
+          }}));
+    Object decodedKey = keyCoder.decode(new ByteArrayInputStream(key.getBytes()));
+    LOG.info("key: {} value: {}.", decodedKey, decodedValues);
+    // Only needs to pass KV to the following GABW operation. However, we have to wrap it in a
+    // global window because of the method signature.
+    operation.process(
+        WindowedValue.valueInGlobalWindow(KV.of(decodedKey, decodedValues)));
+  }
+
+  @Override
+  protected void cleanup(
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) {
+    operation.finish();
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
new file mode 100644
index 0000000..a905d29
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Utilities to handle {@link Configuration}.
+ */
+public class ConfigurationUtils {
+
+  private final MapReducePipelineOptions options;
+
+  public ConfigurationUtils(MapReducePipelineOptions options) {
+    this.options = checkNotNull(options, "options");
+  }
+
+  public static ResourceId getResourceIdForOutput(String fileName, Configuration conf) {
+    ResourceId outDir = FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true);
+    return outDir.resolve(fileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  public String getFileOutputDir(int stageId) {
+    String fileOutputDir = options.getFileOutputDir();
+    if (fileOutputDir.endsWith("/")) {
+      return String.format("%s%s/stage-%d", fileOutputDir, options.getJobName(), stageId);
+    } else {
+      return String.format("%s/%s/stage-%d", fileOutputDir, options.getJobName(), stageId);
+    }
+  }
+
+  public String getFileOutputPath(int stageId, String fileName) {
+    return String.format("%s/%s", getFileOutputDir(stageId), fileName);
+  }
+
+  public static String toFileName(String tagName) {
+    return tagName.replaceAll("[^A-Za-z0-9]", "0");
+  }
+
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
new file mode 100644
index 0000000..12cc03c
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class that outputs {@link Graph} to dot file.
+ */
+public class DotfileWriter {
+
+  public static <StepT extends Graph.AbstractStep, TagT extends Graph.AbstractTag>
+  String toDotfile(Graphs.FusedGraph fusedGraph) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\ndigraph G {\n");
+
+    Map<Graphs.FusedStep, String> fusedStepToId = Maps.newHashMap();
+    int i = 0;
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      String clusterId = String.format("cluster_%d", i++);
+      sb.append(String.format("  subgraph \"%s\" {%n", clusterId));
+      sb.append(String.format("    \"%s\" [shape=point style=invis];%n", clusterId));
+      fusedStepToId.put(fusedStep, clusterId);
+
+      Set<String> nodeDefines = Sets.newHashSet();
+      for (Graphs.Step step : fusedStep.getSteps()) {
+        nodeDefines.add(String.format("    \"%s\" [shape=box];%n", step.getFullName()));
+        for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];%n", inTag));
+        }
+        for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];%n", outTag));
+        }
+      }
+      for (String str : nodeDefines) {
+        sb.append(str);
+      }
+      sb.append(String.format("  }"));
+    }
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      // Edges within fused steps.
+      for (Graphs.Step step : fusedStep.getSteps()) {
+        for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+          sb.append(String.format("  \"%s\" -> \"%s\";%n", inTag, step));
+        }
+        for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+          sb.append(String.format("  \"%s\" -> \"%s\";%n", step, outTag));
+        }
+      }
+
+      // Edges between sub-graphs.
+      for (Graphs.Tag inTag : fusedGraph.getInputTags(fusedStep)) {
+        sb.append(String.format("  \"%s\" -> \"%s\";%n", inTag, fusedStepToId.get(fusedStep)));
+      }
+    }
+    sb.append("}\n");
+    return sb.toString();
+  }
+
+  public static String toDotfile(Graphs.FusedStep fusedStep) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\ndigraph G {\n");
+    for (Graphs.Step step : fusedStep.getSteps()) {
+      sb.append(String.format("  \"%s\" [shape=box];%n", step.getFullName()));
+      for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+        sb.append(String.format("  \"%s\" -> \"%s\";%n", inTag, step));
+      }
+      for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+        sb.append(String.format("  \"%s\" -> \"%s\";%n", step, outTag));
+      }
+    }
+    sb.append("}\n");
+    return sb.toString();
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
new file mode 100644
index 0000000..eb5bef4
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Operation that reads from files.
+ */
+public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> {
+
+  private final String stepName;
+  private final String fileName;
+  private final Coder<WindowedValue<T>> coder;
+  private final TupleTag<?> tupleTag;
+
+  public FileReadOperation(
+      String stepName,
+      String fileName,
+      Coder<WindowedValue<T>> coder,
+      TupleTag<?> tupleTag) {
+    super();
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.fileName = checkNotNull(fileName, "fileName");
+    this.coder = checkNotNull(coder, "coder");
+    this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+  }
+
+  @Override
+  TaggedSource getTaggedSource(Configuration conf) {
+    return TaggedSource.of(
+        stepName,
+        new FileBoundedSource<>(fileName, coder, new SerializableConfiguration(conf)),
+        tupleTag);
+  }
+
+  private static class FileBoundedSource<T> extends BoundedSource<WindowedValue<T>> {
+
+    private final String fileName;
+    private final Coder<WindowedValue<T>> coder;
+    private final SerializableConfiguration conf;
+
+    FileBoundedSource(
+        String fileName, Coder<WindowedValue<T>> coder, SerializableConfiguration conf) {
+      this.fileName = checkNotNull(fileName, "fileName");
+      this.coder = checkNotNull(coder, "coder");
+      this.conf = checkNotNull(conf, "conf");
+    }
+
+    @Override
+    public List<? extends BoundedSource<WindowedValue<T>>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      // TODO: support split.
+      return ImmutableList.of(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<WindowedValue<T>> createReader(PipelineOptions options)
+        throws IOException {
+      Path pattern = new Path(
+          ConfigurationUtils.getResourceIdForOutput(fileName, conf.getConf()) + "*");
+
+      FileSystem fs = pattern.getFileSystem(conf.getConf());
+      FileStatus[] files = fs.globStatus(pattern);
+
+      Queue<SequenceFile.Reader> readers = new LinkedList<>();
+      for (FileStatus f : files) {
+        readers.add(new SequenceFile.Reader(fs, f.getPath(), conf.getConf()));
+      }
+      return new Reader<>(this, readers, coder);
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<WindowedValue<T>> getDefaultOutputCoder() {
+      return coder;
+    }
+
+    private static class Reader<T> extends BoundedReader<WindowedValue<T>> {
+
+      private final BoundedSource<WindowedValue<T>> boundedSource;
+      private final Queue<SequenceFile.Reader> readers;
+      private final Coder<WindowedValue<T>> coder;
+      private final BytesWritable value = new BytesWritable();
+
+      Reader(
+          BoundedSource<WindowedValue<T>> boundedSource,
+          Queue<SequenceFile.Reader> readers,
+          Coder<WindowedValue<T>> coder) {
+        this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+        this.readers = checkNotNull(readers, "readers");
+        this.coder = checkNotNull(coder, "coder");
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        SequenceFile.Reader reader = readers.peek();
+        if (reader == null) {
+          return false;
+        }
+        boolean hasNext = reader.next(NullWritable.get(), value);
+        if (hasNext) {
+          return true;
+        } else {
+          reader.close();
+          readers.remove(reader);
+          return advance();
+        }
+      }
+
+      @Override
+      public WindowedValue<T> getCurrent() throws NoSuchElementException {
+        ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes());
+        try {
+          return coder.decode(inStream);
+        } catch (IOException e) {
+          Throwables.throwIfUnchecked(e);
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public BoundedSource<WindowedValue<T>> getCurrentSource() {
+        return boundedSource;
+      }
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
new file mode 100644
index 0000000..403de4e
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Files based {@link SideInputReader}.
+ */
+public class FileSideInputReader implements SideInputReader {
+
+  private final Map<TupleTag<?>, String> tupleTagToFilePath;
+  private final Map<TupleTag<?>, Coder<?>> tupleTagToCoder;
+  private final Configuration conf;
+
+  public FileSideInputReader(
+      Map<TupleTag<?>, String> tupleTagToFilePath,
+      Map<TupleTag<?>, Coder<?>> tupleTagToCoder,
+      Configuration conf) {
+    this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, "tupleTagToFilePath");
+    this.tupleTagToCoder = checkNotNull(tupleTagToCoder, "tupleTagToCoder");
+    this.conf = checkNotNull(conf, "conf");
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    String filePath = tupleTagToFilePath.get(view.getTagInternal());
+    IterableCoder<WindowedValue<?>> coder =
+        (IterableCoder<WindowedValue<?>>) tupleTagToCoder.get(view.getTagInternal());
+    Coder<WindowedValue<?>> elemCoder = coder.getElemCoder();
+
+    final BoundedWindow sideInputWindow =
+        view.getWindowMappingFn().getSideInputWindow(window);
+
+    Path pattern = new Path(filePath + "*");
+    try {
+      FileSystem fs = pattern.getFileSystem(conf);
+      FileStatus[] files = fs.globStatus(pattern);
+
+      List<WindowedValue<?>> availableSideInputs = new ArrayList<>();
+      if (files.length > 0) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf);
+        BytesWritable value = new BytesWritable();
+        while (reader.next(NullWritable.get(), value)) {
+          ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes());
+          availableSideInputs.add(elemCoder.decode(inStream));
+        }
+      }
+      Iterable<WindowedValue<?>> sideInputForWindow =
+          Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
+            @Override
+            public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+              if (sideInputCandidate == null) {
+                return false;
+              }
+              // first match of a sideInputWindow to the elementWindow is good enough.
+              for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) {
+                if (sideInputCandidateWindow.equals(sideInputWindow)) {
+                  return true;
+                }
+              }
+              // no match found.
+              return false;
+            }
+          });
+      return view.getViewFn().apply(sideInputForWindow);
+    } catch (IOException e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return tupleTagToFilePath.containsKey(view.getTagInternal());
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return tupleTagToFilePath.isEmpty();
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
new file mode 100644
index 0000000..af2e134
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+
+/**
+ * Operation that writes to files.
+ */
+public class FileWriteOperation<T> extends Operation<T> {
+
+  private final String fileName;
+  private final Coder<WindowedValue<T>> coder;
+  private transient MultipleOutputs mos;
+
+  public FileWriteOperation(String fileName, Coder<WindowedValue<T>> coder) {
+    super(0);
+    this.fileName = checkNotNull(fileName, "fileName");
+    this.coder = checkNotNull(coder, "coder");
+  }
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+    this.mos = new MultipleOutputs(taskContext);
+  }
+
+  @Override
+  public void process(WindowedValue<T> elem) throws IOException, InterruptedException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    coder.encode(elem, stream);
+
+    mos.write(fileName, NullWritable.get(), new BytesWritable(stream.toByteArray()));
+  }
+
+  @Override
+  public void finish() {
+    try {
+      mos.close();
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
new file mode 100644
index 0000000..3c5ac95
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Flatten operation.
+ */
+public class FlattenOperation<T> extends Operation<T> {
+
+  private final int duplicateFactor;
+
+  public FlattenOperation(int duplicateFactor) {
+    super(1);
+    this.duplicateFactor = duplicateFactor;
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    for (OutputReceiver receiver : getOutputReceivers()) {
+      for (int i = 0; i < duplicateFactor; ++i) {
+        receiver.process(elem);
+      }
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
new file mode 100644
index 0000000..06ad367
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Flatten} to a {@link FlattenOperation}.
+ */
+public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PCollections<T>> {
+  @Override
+  public void translateNode(Flatten.PCollections<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    Map<Graphs.Tag, Integer> inputTagToCount = Maps.newHashMap();
+    boolean containsDuplicates = false;
+    for (Graphs.Tag inputTag : userGraphContext.getInputTags()) {
+      Integer count = inputTagToCount.get(inputTag);
+      if (count == null) {
+        count = Integer.valueOf(0);
+      }
+      inputTagToCount.put(inputTag, ++count);
+      if (count > 1) {
+        containsDuplicates = true;
+      }
+    }
+
+    String stepName = userGraphContext.getStepName();
+    if (inputTagToCount.isEmpty()) {
+      // Create a empty source
+      Operation<?> operation =
+          new SourceReadOperation(
+              stepName, new EmptySource(), userGraphContext.getOnlyOutputTag());
+      context.addInitStep(
+          Graphs.Step.of(stepName, operation),
+          userGraphContext.getInputTags(),
+          userGraphContext.getOutputTags());
+    } else if (!containsDuplicates) {
+      Operation<?> operation = new FlattenOperation(1);
+      context.addInitStep(
+          Graphs.Step.of(stepName, operation),
+          userGraphContext.getInputTags(),
+          userGraphContext.getOutputTags());
+    } else {
+      List<Graphs.Tag> intermediateTags = new ArrayList<>();
+      for (Map.Entry<Graphs.Tag, Integer> entry : inputTagToCount.entrySet()) {
+        Integer dupFactor = entry.getValue();
+        Graphs.Tag inTag = entry.getKey();
+        checkState(
+            dupFactor > 0, "dupFactor should be positive, but was: " + dupFactor);
+        if (dupFactor == 1) {
+          intermediateTags.add(inTag);
+        } else {
+          String dupStepName = stepName + "/Dup-" + dupFactor;
+          Graphs.Tag outTag = Graphs.Tag.of(
+              dupStepName + ".out",
+              new TupleTag<T>(),
+              inTag.getCoder(),
+              inTag.getWindowingStrategy());
+          context.addInitStep(
+              Graphs.Step.of(dupStepName, new FlattenOperation(dupFactor)),
+              ImmutableList.of(inTag),
+              ImmutableList.of(outTag));
+          intermediateTags.add(outTag);
+        }
+      }
+      context.addInitStep(
+          Graphs.Step.of(stepName, new FlattenOperation(1)),
+          intermediateTags,
+          userGraphContext.getOutputTags());
+    }
+  }
+
+  private static class EmptySource extends BoundedSource<Void> {
+    @Override
+    public List<? extends BoundedSource<Void>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<Void> createReader(PipelineOptions options) throws IOException {
+      return new BoundedReader<Void>() {
+        @Override
+        public BoundedSource<Void> getCurrentSource() {
+          return EmptySource.this;
+        }
+
+        @Override
+        public boolean start() throws IOException {
+          return false;
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+          return false;
+        }
+
+        @Override
+        public Void getCurrent() throws NoSuchElementException {
+          throw new NoSuchElementException();
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+      };
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
new file mode 100644
index 0000000..b4549d3
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.graph.ElementOrder;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Graph that represents a Beam DAG.
+ */
+public class Graph<StepT extends Graph.AbstractStep, TagT extends Graph.AbstractTag> {
+
+  public final MutableGraph<Vertex> graph;
+
+  public Graph() {
+    this.graph = GraphBuilder.directed()
+        .allowsSelfLoops(false)
+        .nodeOrder(ElementOrder.insertion())
+        .build();
+  }
+
+  /**
+   * Adds {@link StepT} to this {@link Graph}.
+   */
+  public void addStep(StepT step, List<TagT> inTags, List<TagT> outTags) {
+    graph.addNode(step);
+    Set<Vertex> nodes = graph.nodes();
+    for (TagT tag : inTags) {
+      if (!nodes.contains(tag)) {
+        graph.addNode(tag);
+      }
+      graph.putEdge(tag, step);
+    }
+    for (TagT tag : outTags) {
+      if (!nodes.contains(tag)) {
+        graph.addNode(tag);
+      }
+      graph.putEdge(step, tag);
+    }
+  }
+
+  public void removeStep(StepT step) {
+    graph.removeNode(step);
+  }
+
+  public void removeTag(TagT tag) {
+    graph.removeNode(tag);
+  }
+
+  public void addEdge(TagT inTag, StepT step) {
+    graph.putEdge(inTag, step);
+  }
+
+  public void addEdge(StepT step, TagT outTag) {
+    graph.putEdge(step, outTag);
+  }
+
+  public void removeEdge(TagT inTag, StepT step) {
+    graph.removeEdge(inTag, step);
+  }
+
+  public void removeEdge(StepT step, TagT outTag) {
+    graph.removeEdge(step, outTag);
+  }
+
+  public List<StepT> getSteps() {
+    List<Vertex> ret = new ArrayList<>();
+
+    Set<Vertex> pendingNodes = Sets.newHashSet(graph.nodes());
+    while (!pendingNodes.isEmpty()) {
+      List<Vertex> readyNodes = new ArrayList<>();
+      for (Vertex v : pendingNodes) {
+        if (Sets.intersection(pendingNodes, graph.predecessors(v)).isEmpty()) {
+          readyNodes.add(v);
+        }
+      }
+      checkState(
+          !readyNodes.isEmpty(),
+          "No ready nodes found, there are cycles in graph: " + graph);
+      ret.addAll(readyNodes);
+      pendingNodes.removeAll(readyNodes);
+    }
+    return castToStepList(FluentIterable.from(ret)
+        .filter(new Predicate<Vertex>() {
+          @Override
+          public boolean apply(Vertex input) {
+            return input instanceof AbstractStep;
+          }}))
+        .toList();
+  }
+
+  public List<StepT> getStartSteps() {
+    return castToStepList(FluentIterable.from(graph.nodes())
+        .filter(new Predicate<Vertex>() {
+          @Override
+          public boolean apply(Vertex input) {
+            return input instanceof AbstractStep && graph.inDegree(input) == 0;
+          }}))
+        .toList();
+  }
+
+  public StepT getProducer(TagT tag) {
+    if (contains(tag)) {
+      return (StepT) Iterables.getOnlyElement(graph.predecessors(tag));
+    } else {
+      return null;
+    }
+  }
+
+  public List<StepT> getConsumers(TagT tag) {
+    if (contains(tag)) {
+      return castToStepList(graph.successors(tag)).toList();
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  public List<TagT> getInputTags(StepT step) {
+    if (contains(step)) {
+      return castToTagList(graph.predecessors(step)).toList();
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  public List<TagT> getOutputTags(StepT step) {
+    if (contains(step)) {
+      return castToTagList(graph.successors(step)).toList();
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  private boolean contains(Vertex node) {
+    return graph.nodes().contains(node);
+  }
+
+  private FluentIterable<StepT> castToStepList(Iterable<Vertex> vertices) {
+    return FluentIterable.from(vertices)
+        .transform(new Function<Vertex, StepT>() {
+          @Override
+          public StepT apply(Vertex input) {
+            return (StepT) input;
+          }});
+  }
+
+  private FluentIterable<TagT> castToTagList(Iterable<Vertex> vertices) {
+    return FluentIterable.from(vertices)
+        .transform(new Function<Vertex, TagT>() {
+          @Override
+          public TagT apply(Vertex input) {
+            return (TagT) input;
+          }});
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof Graph) {
+      Graph other = (Graph) obj;
+      return com.google.common.graph.Graphs.equivalent(this.graph, other.graph);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(this.getClass(), graph.nodes());
+  }
+
+  /**
+   * Vertex interface of this Graph.
+   */
+  interface Vertex {
+  }
+
+  /**
+   * Step {@link Vertex}.
+   */
+  public abstract static class AbstractStep implements Vertex {
+  }
+
+  /**
+   * Tag {@link Vertex}.
+   */
+  public abstract static class AbstractTag implements Vertex {
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
new file mode 100644
index 0000000..1a4988b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.Stack;
+import org.apache.beam.runners.mapreduce.MapReduceRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Pipeline translator for {@link MapReduceRunner}.
+ */
+public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
+
+  private final TranslationContext context;
+  private final Stack<StringBuilder> dotfileNodesBuilders;
+  private final Map<TransformHierarchy.Node, Integer> enclosedTransformCounts;
+  private final StringBuilder dotfileEdgesBuilder;
+
+  private int indent;
+
+  public GraphConverter(TranslationContext context) {
+    this.context = checkNotNull(context, "context");
+    this.enclosedTransformCounts = Maps.newHashMap();
+    this.dotfileNodesBuilders = new Stack<>();
+    this.dotfileEdgesBuilder = new StringBuilder();
+    this.indent = 0;
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    // check if current composite transforms need to be translated.
+    // If not, all sub transforms will be translated in visitPrimitiveTransform.
+    PTransform<?, ?> transform = node.getTransform();
+    dotfileNodesBuilders.push(new StringBuilder());
+    if (transform != null) {
+      markEnclosedTransformCounts(node);
+      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+      if (translator != null && applyCanTranslate(transform, node, translator)) {
+        applyTransform(transform, node, translator);
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
+      indent += 2;
+    }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    if (node.getTransform() != null) {
+      Integer enclosedTransformCount = enclosedTransformCounts.get(node);
+      if (enclosedTransformCount != null && enclosedTransformCount > 1) {
+        dotfileNodesBuilders.peek().insert(0, new StringBuilder()
+            .append(getIndent()).append(
+                String.format("subgraph \"cluster_%s\" {", node.getFullName()))
+            .append('\n')
+            .append(getIndent()).append(
+                String.format("  label=\"%s\";", node.getFullName()))
+            .append('\n')
+            .toString());
+        dotfileNodesBuilders.peek().append(new StringBuilder()
+            .append(getIndent()).append("}").append('\n')
+            .toString());
+      }
+      StringBuilder top = dotfileNodesBuilders.pop();
+      dotfileNodesBuilders.peek().append(top.toString());
+      indent -= 2;
+    }
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    if (!node.isRootNode()) {
+      markEnclosedTransformCounts(node);
+
+      PTransform<?, ?> transform = node.getTransform();
+      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+      if (translator == null || !applyCanTranslate(transform, node, translator)) {
+        throw new UnsupportedOperationException(
+            "The transform " + transform + " is currently not supported.");
+      }
+      applyTransform(transform, node, translator);
+    }
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
+    dotfileNodesBuilders.peek().append(getIndent())
+        .append(String.format("\"%s\" [shape=ellipse];", value.getName()))
+        .append('\n');
+  }
+
+  private void markEnclosedTransformCounts(TransformHierarchy.Node node) {
+    TransformHierarchy.Node parent = node.getEnclosingNode();
+    Integer primitiveCount = enclosedTransformCounts.get(parent);
+    if (primitiveCount == null) {
+      primitiveCount = 0;
+    }
+    enclosedTransformCounts.put(parent, primitiveCount + 1);
+  }
+
+  public String getDotfile() {
+    return String.format(
+        "%ndigraph G {%n%s%s}%n",
+        dotfileNodesBuilders.peek().toString(),
+        dotfileEdgesBuilder.toString());
+  }
+
+  private <T extends PTransform<?, ?>> void applyTransform(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    dotfileNodesBuilders.peek()
+        .append(getIndent())
+        .append(String.format("\"%s\" [shape=box];", node.getFullName()))
+        .append('\n');
+    for (PValue input : node.getInputs().values()) {
+      dotfileEdgesBuilder
+          .append(String.format("  \"%s\" -> \"%s\";", input.getName(), node.getFullName()))
+          .append('\n');
+    }
+    for (PValue output : node.getOutputs().values()) {
+      dotfileEdgesBuilder
+          .append(String.format("  \"%s\" -> \"%s\";", node.getFullName(), output.getName()))
+          .append('\n');
+    }
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+    context.getUserGraphContext().setCurrentNode(node);
+    typedTranslator.translateNode(typedTransform, context);
+  }
+
+  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+    context.getUserGraphContext().setCurrentNode(node);
+    return typedTranslator.canTranslate(typedTransform, context);
+  }
+
+  private String getIndent() {
+    StringBuilder ret = new StringBuilder();
+    for (int i = 0; i < indent; ++i) {
+      ret.append(' ');
+    }
+    return ret.toString();
+  }
+}
+
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
new file mode 100644
index 0000000..bc360fb
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Class that optimizes the initial graph to a fused graph.
+ */
+public class GraphPlanner {
+
+  private final ConfigurationUtils configUtils;
+
+  public GraphPlanner(MapReducePipelineOptions options) {
+    checkNotNull(options, "options");
+    this.configUtils = new ConfigurationUtils(options);
+  }
+
+  public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) {
+    // Attach writes/reads on fusion boundaries.
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      for (Graphs.Tag tag : fusedGraph.getOutputTags(fusedStep)) {
+        List<Graphs.FusedStep> consumers = fusedGraph.getConsumers(tag);
+        if (consumers.isEmpty()) {
+          continue;
+        }
+        Graphs.Step producer = fusedStep.getProducer(tag);
+        if (producer.getOperation() instanceof FileWriteOperation) {
+          continue;
+        }
+        String tagName = tag.getName();
+        String fileName = ConfigurationUtils.toFileName(tagName);
+
+        WindowedValue.WindowedValueCoder<?> writeValueCoder = WindowedValue.getFullCoder(
+            tag.getCoder(), tag.getWindowingStrategy().getWindowFn().windowCoder());
+
+        fusedStep.addStep(
+            Graphs.Step.of(
+                tagName + "/Write",
+                new FileWriteOperation(fileName, writeValueCoder)),
+            ImmutableList.of(tag),
+            ImmutableList.<Graphs.Tag>of());
+
+        String readStepName = tagName + "/Read";
+        Graphs.Tag readOutput = Graphs.Tag.of(
+            readStepName + ".out", tag.getTupleTag(), tag.getCoder(), tag.getWindowingStrategy());
+        for (Graphs.FusedStep consumer : consumers) {
+          // Re-direct tag to readOutput.
+          List<Graphs.Step> receivers = consumer.getConsumers(tag);
+          for (Graphs.Step step : receivers) {
+            consumer.addEdge(readOutput, step);
+          }
+          consumer.removeTag(tag);
+
+          String filePath = configUtils.getFileOutputPath(fusedStep.getStageId(), fileName);
+          consumer.addStep(
+              Graphs.Step.of(
+                  readStepName,
+                  new FileReadOperation(
+                      readStepName, filePath, writeValueCoder, tag.getTupleTag())),
+              ImmutableList.<Graphs.Tag>of(),
+              ImmutableList.of(readOutput));
+        }
+      }
+    }
+
+    // Insert PartitionOperation
+    for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      List<Graphs.Step> readSteps = fusedStep.getStartSteps();
+
+      List<ReadOperation> readOperations = new ArrayList<>();
+      List<Graphs.Tag> readOutTags = new ArrayList<>();
+      List<TupleTag<?>> readOutTupleTags = new ArrayList<>();
+      StringBuilder partitionStepName = new StringBuilder();
+      for (Graphs.Step step : readSteps) {
+        checkState(step.getOperation() instanceof ReadOperation);
+        readOperations.add(((ReadOperation) step.getOperation()));
+        Graphs.Tag tag = Iterables.getOnlyElement(fusedStep.getOutputTags(step));
+        readOutTags.add(tag);
+        readOutTupleTags.add(tag.getTupleTag());
+        partitionStepName.append(step.getFullName());
+
+        fusedStep.removeStep(step);
+      }
+      if (partitionStepName.length() > 0) {
+        partitionStepName.deleteCharAt(partitionStepName.length() - 1);
+      }
+
+      Graphs.Step partitionStep = Graphs.Step.of(
+          partitionStepName.toString(), new PartitionOperation(readOperations, readOutTupleTags));
+      fusedStep.addStep(partitionStep, ImmutableList.<Graphs.Tag>of(), readOutTags);
+    }
+
+    // Setup side inputs
+    for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      for (Graphs.Step step : fusedStep.getSteps()) {
+        if (!(step.getOperation() instanceof ParDoOperation)) {
+          continue;
+        }
+        ParDoOperation parDo = (ParDoOperation) step.getOperation();
+        List<Graphs.Tag> sideInputTags = parDo.getSideInputTags();
+        if (sideInputTags.size() == 0) {
+          continue;
+        }
+        Map<TupleTag<?>, String> tupleTagToFilePath = Maps.newHashMap();
+        for (Graphs.Tag sideInTag : sideInputTags) {
+          tupleTagToFilePath.put(
+              sideInTag.getTupleTag(),
+              configUtils.getFileOutputPath(
+                  fusedGraph.getProducer(sideInTag).getStageId(),
+                  ConfigurationUtils.toFileName(sideInTag.getName())));
+        }
+        parDo.setupSideInput(tupleTagToFilePath);
+      }
+    }
+    return fusedGraph;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
new file mode 100644
index 0000000..f23e572
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Class that defines graph vertices.
+ */
+public class Graphs {
+
+  private Graphs() {}
+
+  /**
+   * Class that represents an optimized graph.
+   */
+  public static class FusedGraph {
+    private final Graph<FusedStep, Tag> graph;
+    private int stageId = 0;
+
+    public FusedGraph() {
+      this.graph = new Graph<>();
+    }
+
+    public FusedGraph(Graph<Graphs.Step, Tag> initGraph) {
+      this.graph = new Graph<>();
+
+      // Convert from the list of steps to Graphs.
+      for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) {
+        tryFuse(step, initGraph.getInputTags(step), initGraph.getOutputTags(step));
+      }
+      // Remove unused external tags.
+      for (FusedStep fusedStep : graph.getSteps()) {
+        for (Tag outTag : graph.getOutputTags(fusedStep)) {
+          if (graph.getConsumers(outTag).isEmpty()) {
+            graph.removeTag(outTag);
+          }
+        }
+      }
+    }
+
+    public void tryFuse(
+        Graphs.Step step,
+        List<Graphs.Tag> inTags,
+        List<Graphs.Tag> outTags) {
+      if (canFuse(step, inTags, outTags)) {
+        Graphs.Tag outTag = Iterables.getOnlyElement(outTags);
+        Graphs.FusedStep consumer = Iterables.getOnlyElement(graph.getConsumers(outTag));
+        consumer.addStep(step, inTags, outTags);
+        for (Graphs.Tag in : inTags) {
+          graph.addEdge(in, consumer);
+        }
+        graph.removeTag(outTag);
+        graph.addEdge(consumer, outTag);
+      } else {
+        Graphs.FusedStep newFusedStep = new Graphs.FusedStep(stageId++);
+        newFusedStep.addStep(step, inTags, outTags);
+        graph.addStep(newFusedStep, inTags, outTags);
+      }
+    }
+
+    private boolean canFuse(
+        Graphs.Step step,
+        List<Graphs.Tag> inTags,
+        List<Graphs.Tag> outTags) {
+      if (step.getOperation() instanceof FileWriteOperation) {
+        return false;
+      }
+      if (outTags.size() != 1) {
+        return false;
+      }
+      Graphs.Tag outTag = Iterables.getOnlyElement(outTags);
+      if (graph.getConsumers(outTag).size() != 1) {
+        return false;
+      }
+      Graphs.FusedStep consumer = Iterables.getOnlyElement(graph.getConsumers(outTag));
+      if (consumer.containsGroupByKey() && step.getOperation() instanceof GroupByKeyOperation) {
+        return false;
+      }
+      return true;
+    }
+
+    public FusedStep getProducer(Tag tag) {
+      return graph.getProducer(tag);
+    }
+
+    public List<FusedStep> getConsumers(Tag tag) {
+      return graph.getConsumers(tag);
+    }
+
+    public List<FusedStep> getFusedSteps() {
+      return graph.getSteps();
+    }
+
+    public List<Tag> getInputTags(FusedStep fusedStep) {
+      return graph.getInputTags(fusedStep);
+    }
+
+    public List<Tag> getOutputTags(FusedStep fusedStep) {
+      return graph.getOutputTags(fusedStep);
+    }
+  }
+
+  /**
+   * An {@link Graph.AbstractStep} that represents an optimized sub-graph that can be executed
+   * in one MapReduce job.
+   */
+  public static class FusedStep extends Graph.AbstractStep {
+    private final int stageId;
+    private final Graph<Step, Tag> steps;
+    private Step groupByKeyStep;
+
+    public FusedStep(int stageid) {
+      this.stageId = stageid;
+      this.steps = new Graph<>();
+      this.groupByKeyStep = null;
+    }
+
+    public int getStageId() {
+      return stageId;
+    }
+
+    public List<Tag> getInputTags(Step step) {
+      return steps.getInputTags(step);
+    }
+
+    public List<Tag> getOutputTags(Step step) {
+      return steps.getOutputTags(step);
+    }
+
+    public void addStep(Step step, List<Tag> inTags, List<Tag> outTags) {
+      steps.addStep(step, inTags, outTags);
+      if (step.getOperation() instanceof GroupByKeyOperation) {
+        groupByKeyStep = step;
+      }
+    }
+
+    public void removeStep(Step step) {
+      steps.removeStep(step);
+    }
+
+    public void removeTag(Tag tag) {
+      steps.removeTag(tag);
+    }
+
+    public void addEdge(Tag inTag, Step step) {
+      steps.addEdge(inTag, step);
+    }
+
+    public void addEdge(Step step, Tag outTag) {
+      steps.addEdge(step, outTag);
+    }
+
+    public void removeEdge(Tag inTag, Step step) {
+      steps.removeEdge(inTag, step);
+    }
+
+    public void removeEdge(Step step, Tag outTag) {
+      steps.removeEdge(step, outTag);
+    }
+
+    public Step getProducer(Tag tag) {
+      return steps.getProducer(tag);
+    }
+
+    public List<Step> getConsumers(Tag tag) {
+      return steps.getConsumers(tag);
+    }
+
+    public List<Step> getSteps() {
+      return steps.getSteps();
+    }
+
+    public List<Step> getStartSteps() {
+      return steps.getStartSteps();
+    }
+
+    public boolean containsGroupByKey() {
+      return groupByKeyStep != null;
+    }
+
+    @Nullable
+    public Step getGroupByKeyStep() {
+      return groupByKeyStep;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (Step step : steps.getSteps()) {
+        sb.append(step.getFullName() + "|");
+      }
+      if (sb.length() > 0) {
+        sb.deleteCharAt(sb.length() - 1);
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * An {@link Graph.AbstractStep} that represents one {@link Operation}.
+   */
+  @AutoValue
+  public abstract static class Step extends Graph.AbstractStep {
+    abstract String getFullName();
+    // TODO: remove public
+    public abstract Operation getOperation();
+
+    public static Step of(String fullName, Operation operation) {
+      return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step(
+          fullName, operation);
+    }
+
+    @Override
+    public String toString() {
+      return getFullName();
+    }
+  }
+
+  /**
+   * An {@link Graph.AbstractTag} that contains information about input/output data.
+   */
+  @AutoValue
+  public abstract static class Tag extends Graph.AbstractTag implements Serializable {
+    abstract String getName();
+    abstract TupleTag<?> getTupleTag();
+    abstract Coder<?> getCoder();
+    abstract WindowingStrategy<?, ?> getWindowingStrategy();
+
+    @Override
+    public String toString() {
+      return getName();
+    }
+
+    public static Tag of(
+        String name,
+        TupleTag<?> tupleTag,
+        Coder<?> coder,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag(
+          name, tupleTag, coder, windowingStrategy);
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
new file mode 100644
index 0000000..14e3a29
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * {@link Operation} that executes a {@link GroupAlsoByWindowsViaOutputBufferDoFn}.
+ */
+public class GroupAlsoByWindowsParDoOperation extends ParDoOperation {
+
+  private final Coder<?> inputCoder;
+
+  public GroupAlsoByWindowsParDoOperation(
+      String stepName,
+      PipelineOptions options,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Coder<?> inputCoder,
+      Graphs.Tag outTag) {
+    super(stepName, options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+        ImmutableList.<Graphs.Tag>of(), windowingStrategy);
+    this.inputCoder = checkNotNull(inputCoder, "inputCoder");
+  }
+
+  @Override
+  DoFn<Object, Object> getDoFn() {
+    return new GroupAlsoByWindowsViaOutputBufferDoFn(
+        windowingStrategy,
+        SystemReduceFn.buffering(inputCoder),
+        mainOutputTag,
+        createOutputManager());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..5ac23a5
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+
+/**
+ * The default batch implementation, if no specialized "fast path" implementation is applicable.
+ */
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
+    extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+
+  private final WindowingStrategy<Object, W> windowingStrategy;
+  private final SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+  private final TupleTag<KV<K, OutputT>> mainTag;
+  private transient DoFnRunners.OutputManager outputManager;
+
+  public GroupAlsoByWindowsViaOutputBufferDoFn(
+      WindowingStrategy<Object, W> windowingStrategy,
+      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+      TupleTag<KV<K, OutputT>> mainTag,
+      DoFnRunners.OutputManager outputManager) {
+    this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
+    this.reduceFn = checkNotNull(reduceFn, "reduceFn");
+    this.mainTag = checkNotNull(mainTag, "mainTag");
+    this.outputManager = checkNotNull(outputManager, "outputManager");
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    K key = c.element().getKey();
+    // Used with Batch, we know that all the data is available for this key. We can't use the
+    // timer manager from the context because it doesn't exist. So we create one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    ReduceFnRunner<K, InputT, OutputT, W> runner = new ReduceFnRunner<>(
+        key,
+        windowingStrategy,
+        ExecutableTriggerStateMachine.create(
+            TriggerStateMachines.stateMachineForTrigger(
+                TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
+        InMemoryStateInternals.forKey(key),
+        timerInternals,
+        outputWindowedValue(),
+        NullSideInputReader.empty(),
+        reduceFn,
+        c.getPipelineOptions());
+
+    Iterable<List<WindowedValue<InputT>>> chunks =
+        Iterables.partition(c.element().getValue(), 1000);
+    for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+      // Process the chunk of elements.
+      runner.processElements(chunk);
+
+      // Then, since elements are sorted by their timestamp, advance the input watermark
+      // to the first element, and fire any timers that may have been scheduled.
+      // TODO: re-enable once elements are sorted.
+      // timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
+
+      // Fire any processing timers that need to fire
+      timerInternals.advanceProcessingTime(Instant.now());
+
+      // Leave the output watermark undefined. Since there's no late data in batch mode
+      // there's really no need to track it as we do for streaming.
+    }
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    runner.onTimers(timerInternals.getTimers(TimeDomain.EVENT_TIME));
+    runner.onTimers(timerInternals.getTimers(TimeDomain.PROCESSING_TIME));
+    runner.onTimers(timerInternals.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+
+    runner.persist();
+  }
+
+  private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
+    return new OutputWindowedValue<KV<K, OutputT>>() {
+      @Override
+      public void outputWindowedValue(
+          KV<K, OutputT> output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        outputManager.output(mainTag,
+            WindowedValue.of(output, timestamp, windows, pane));
+      }
+
+      @Override
+      public <AdditionalOutputT> void outputWindowedValue(
+          TupleTag<AdditionalOutputT> tag,
+          AdditionalOutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        outputManager.output(tag,
+            WindowedValue.of(output, timestamp, windows, pane));
+      }
+    };
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
new file mode 100644
index 0000000..b0be494
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * A GroupByKey place holder {@link Operation} during pipeline translation.
+ */
+public class GroupByKeyOperation<K, V> extends Operation<KV<K, V>> {
+
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final KvCoder<K, V> kvCoder;
+
+  public GroupByKeyOperation(WindowingStrategy<?, ?> windowingStrategy, KvCoder<K, V> kvCoder) {
+    super(1);
+    this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
+    this.kvCoder = checkNotNull(kvCoder, "kvCoder");
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    throw new IllegalStateException(
+        String.format("%s should not in execution graph.", this.getClass().getSimpleName()));
+  }
+
+  public WindowingStrategy<?, ?> getWindowingStrategy() {
+    return windowingStrategy;
+  }
+
+  public KvCoder<K, V> getKvCoder() {
+    return kvCoder;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..4c627d7
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link GroupByKey} to {@link Operation Operations}.
+ */
+class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
+  @Override
+  public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    PCollection<?> inPCollection = (PCollection<?>) userGraphContext.getInput();
+    WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy();
+    Coder<?> inCoder = inPCollection.getCoder();
+
+    GroupByKeyOperation<K, V> groupByKeyOperation =
+        new GroupByKeyOperation<>(windowingStrategy, (KvCoder<K, V>) inCoder);
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), groupByKeyOperation),
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
new file mode 100644
index 0000000..e8e6eab
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * Class that translates a {@link Graphs.FusedStep} to a MapReduce job.
+ */
+public class JobPrototype {
+
+  public static JobPrototype create(
+      int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) {
+    return new JobPrototype(stageId, fusedStep, options);
+  }
+
+  private final int stageId;
+  private final Graphs.FusedStep fusedStep;
+  private final MapReducePipelineOptions options;
+  private final ConfigurationUtils configUtils;
+
+  private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) {
+    this.stageId = stageId;
+    this.fusedStep = checkNotNull(fusedStep, "fusedStep");
+    this.options = checkNotNull(options, "options");
+    this.configUtils = new ConfigurationUtils(options);
+  }
+
+  public Job build(Class<?> jarClass, Configuration initConf) throws IOException {
+    Job job = new Job(initConf);
+    final Configuration conf = job.getConfiguration();
+    job.setJarByClass(jarClass);
+    conf.set(
+        "io.serializations",
+        "org.apache.hadoop.io.serializer.WritableSerialization,"
+            + "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.set("mapreduce.job.counters.group.name.max", "512");
+    Limits.init(conf);
+
+    conf.set(
+        FileOutputFormat.OUTDIR,
+        configUtils.getFileOutputDir(fusedStep.getStageId()));
+
+    // Setup BoundedSources in BeamInputFormat.
+    Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps());
+    checkState(startStep.getOperation() instanceof PartitionOperation);
+    PartitionOperation partitionOperation = (PartitionOperation) startStep.getOperation();
+
+    ArrayList<ReadOperation.TaggedSource> taggedSources = new ArrayList<>();
+    taggedSources.addAll(FluentIterable.from(partitionOperation
+        .getReadOperations())
+        .transform(new Function<ReadOperation, ReadOperation.TaggedSource>() {
+          @Override
+          public ReadOperation.TaggedSource apply(ReadOperation operation) {
+            return operation.getTaggedSource(conf);
+          }})
+        .toList());
+    conf.set(
+        BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+            taggedSources)));
+    conf.set(
+        BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+            new SerializedPipelineOptions(options))));
+    job.setInputFormatClass(BeamInputFormat.class);
+
+    if (fusedStep.containsGroupByKey()) {
+      Graphs.Step groupByKey = fusedStep.getGroupByKeyStep();
+      Graphs.Tag gbkOutTag = Iterables.getOnlyElement(fusedStep.getOutputTags(groupByKey));
+      GroupByKeyOperation operation = (GroupByKeyOperation) groupByKey.getOperation();
+      WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy();
+      KvCoder<?, ?> kvCoder = operation.getKvCoder();
+
+      String reifyStepName = groupByKey.getFullName() + "-Reify";
+      Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy);
+      Graphs.Tag reifyOutputTag = Graphs.Tag.of(
+          reifyStepName + ".out", new TupleTag<>(), reifyValueCoder, windowingStrategy);
+      Graphs.Step reifyStep = Graphs.Step.of(
+          reifyStepName,
+          new ReifyTimestampAndWindowsParDoOperation(
+              reifyStepName, options, operation.getWindowingStrategy(), reifyOutputTag));
+
+      Graphs.Step writeStep = Graphs.Step.of(
+          groupByKey.getFullName() + "-Write",
+          new ShuffleWriteOperation(kvCoder.getKeyCoder(), reifyValueCoder));
+
+      String gabwStepName = groupByKey.getFullName() + "-GroupAlsoByWindows";
+      Graphs.Step gabwStep = Graphs.Step.of(
+          gabwStepName,
+          new GroupAlsoByWindowsParDoOperation(
+              gabwStepName, options, windowingStrategy, kvCoder, gbkOutTag));
+
+      fusedStep.addStep(
+          reifyStep, fusedStep.getInputTags(groupByKey), ImmutableList.of(reifyOutputTag));
+      fusedStep.addStep(
+          writeStep, ImmutableList.of(reifyOutputTag), Collections.<Graphs.Tag>emptyList());
+      fusedStep.addStep(
+          gabwStep, Collections.<Graphs.Tag>emptyList(), ImmutableList.of(gbkOutTag));
+      fusedStep.removeStep(groupByKey);
+
+      // Setup BeamReducer
+      Graphs.Step reducerStartStep = gabwStep;
+      chainOperations(reducerStartStep, fusedStep, Sets.<Graphs.Step>newHashSet());
+      conf.set(
+          BeamReducer.BEAM_REDUCER_KV_CODER,
+          Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+              KvCoder.of(kvCoder.getKeyCoder(), reifyValueCoder))));
+      conf.set(
+          BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER,
+          Base64.encodeBase64String(
+              SerializableUtils.serializeToByteArray(reducerStartStep.getOperation())));
+      job.setReducerClass(BeamReducer.class);
+    }
+
+    // Setup DoFns in BeamMapper.
+    chainOperations(startStep, fusedStep, Sets.<Graphs.Step>newHashSet());
+
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(byte[].class);
+    conf.set(
+        BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
+        Base64.encodeBase64String(
+            SerializableUtils.serializeToByteArray(startStep.getOperation())));
+    job.setMapperClass(BeamMapper.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    for (Graphs.Step step : fusedStep.getSteps()) {
+      if (step.getOperation() instanceof FileWriteOperation) {
+        FileWriteOperation writeOperation = (FileWriteOperation) step.getOperation();
+        //SequenceFileOutputFormat.setOutputPath(job, new Path("/tmp/mapreduce/"));
+        MultipleOutputs.addNamedOutput(
+            job,
+            writeOperation.getFileName(),
+            SequenceFileOutputFormat.class,
+            NullWritable.class, BytesWritable.class);
+      }
+    }
+    return job;
+  }
+
+  private void chainOperations(
+      Graphs.Step current, Graphs.FusedStep fusedStep, Set<Graphs.Step> visited) {
+    Operation<?> operation = current.getOperation();
+    List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current);
+    for (Graphs.Tag outTag : outputTags) {
+      for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
+        operation.attachConsumer(outTag.getTupleTag(), consumer.getOperation());
+      }
+    }
+    visited.add(current);
+    for (Graphs.Tag outTag : outputTags) {
+      for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
+        if (!visited.contains(consumer)) {
+          chainOperations(consumer, fusedStep, visited);
+        }
+      }
+    }
+  }
+
+  private Coder<Object> getReifyValueCoder(
+      Coder<?> valueCoder, WindowingStrategy<?, ?> windowingStrategy) {
+    // TODO: do we need full coder to encode windows.
+    return (Coder) WindowedValue.getFullCoder(
+        valueCoder, windowingStrategy.getWindowFn().windowCoder());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
new file mode 100644
index 0000000..1d1c9ff
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Implementation of {@link MetricResults} for the MapReduce Runner.
+ */
+public class MapReduceMetricResults extends MetricResults {
+
+  private final List<Job> jobs;
+
+  public MapReduceMetricResults(List<Job> jobs) {
+    this.jobs = checkNotNull(jobs, "jobs");
+  }
+
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    List<MetricResult<Long>> counters = new ArrayList<>();
+    for (Job job : jobs) {
+      Iterable<CounterGroup> groups;
+      try {
+        groups = job.getCounters();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      for (CounterGroup group : groups) {
+        String groupName = group.getName();
+        for (Counter counter : group) {
+          MetricKey metricKey = MetricsReporter.toMetricKey(groupName, counter.getName());
+          if (!MetricFiltering.matches(filter, metricKey)) {
+            continue;
+          }
+          counters.add(
+              MapReduceMetricResult.create(
+                  metricKey.metricName(),
+                  metricKey.stepName(),
+                  counter.getValue()));
+        }
+      }
+    }
+    return MapReduceMetricQueryResults.create(counters);
+  }
+
+
+  @AutoValue
+  abstract static class MapReduceMetricQueryResults implements MetricQueryResults {
+
+    public abstract @Nullable Iterable<MetricResult<DistributionResult>> distributions();
+    public abstract @Nullable Iterable<MetricResult<GaugeResult>> gauges();
+
+    public static MetricQueryResults create(Iterable<MetricResult<Long>> counters) {
+      return new AutoValue_MapReduceMetricResults_MapReduceMetricQueryResults(
+          counters, null, null);
+    }
+  }
+
+  @AutoValue
+  abstract static class MapReduceMetricResult<T> implements MetricResult<T> {
+    // need to define these here so they appear in the correct order
+    // and the generated constructor is usable and consistent
+    public abstract MetricName name();
+    public abstract String step();
+    public abstract @Nullable T committed();
+    public abstract T attempted();
+
+    public static <T> MetricResult<T> create(MetricName name, String step, T attempted) {
+      return new AutoValue_MapReduceMetricResults_MapReduceMetricResult<T>(
+          name, step, null, attempted);
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
new file mode 100644
index 0000000..9fe139d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to MapReduce framework.
+ */
+public class MetricsReporter {
+
+  private static final String METRIC_KEY_SEPARATOR = "__";
+  private static final String METRIC_PREFIX = "__metrics";
+
+  private final TaskAttemptContext context;
+  private final MetricsContainerStepMap metricsContainers;
+  private final Map<String, Long> reportedCounters = Maps.newHashMap();
+
+  MetricsReporter(TaskAttemptContext context) {
+    this.context = checkNotNull(context, "context");
+    this.metricsContainers = new MetricsContainerStepMap();
+  }
+
+  public MetricsContainer getMetricsContainer(String stepName) {
+    return metricsContainers.getContainer(stepName);
+  }
+
+  public void updateMetrics() {
+    MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+    MetricQueryResults metricQueryResults =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+    updateCounters(metricQueryResults.counters());
+  }
+
+  private void updateCounters(Iterable<MetricResult<Long>> counters) {
+    for (MetricResult<Long> metricResult : counters) {
+      String reportedCounterKey = reportedCounterKey(metricResult);
+      Long updateValue = metricResult.attempted();
+      Long oldValue = reportedCounters.get(reportedCounterKey);
+
+      if (oldValue == null || oldValue < updateValue) {
+        Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+        context.getCounter(groupName(metricResult), metricResult.name().name())
+            .increment(incValue);
+        reportedCounters.put(reportedCounterKey, updateValue);
+      }
+    }
+  }
+
+  private String groupName(MetricResult<?> metricResult) {
+    return METRIC_PREFIX
+        + METRIC_KEY_SEPARATOR + metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace();
+  }
+
+  private String reportedCounterKey(MetricResult<?> metricResult) {
+    return metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+        + METRIC_KEY_SEPARATOR + metricResult.name().name();
+  }
+
+  public static MetricKey toMetricKey(String groupName, String counterName) {
+    String[] nameSplits = groupName.split(METRIC_KEY_SEPARATOR);
+    int length = nameSplits.length;
+    String stepName = length > 1 ? nameSplits[length - 2] : "";
+    String namespace = length > 0 ? nameSplits[length - 1] : "";
+    return MetricKey.create(stepName, MetricName.named(namespace, counterName));
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
new file mode 100644
index 0000000..8b730ff
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * {@link Operation} that executes a {@link DoFn}.
+ */
+public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT, OutputT> {
+
+  private final DoFn<InputT, OutputT> doFn;
+
+  public NormalParDoOperation(
+      String stepName,
+      DoFn<InputT, OutputT> doFn,
+      PipelineOptions options,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      List<Graphs.Tag> sideInputTags,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    super(stepName, options, mainOutputTag, sideOutputTags, sideInputTags, windowingStrategy);
+    this.doFn = checkNotNull(doFn, "doFn");
+  }
+
+  @Override
+  DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
new file mode 100644
index 0000000..a96806d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Class that processes elements and forwards outputs to consumers.
+ */
+public abstract class Operation<T> implements Serializable {
+  private final OutputReceiver[] receivers;
+  private SerializableConfiguration conf;
+  private boolean started;
+  private boolean finished;
+
+  public Operation(int numOutputs) {
+    this.receivers = new OutputReceiver[numOutputs];
+    for (int i = 0; i < numOutputs; ++i) {
+      receivers[i] = new OutputReceiver();
+    }
+    this.started = false;
+    this.finished = false;
+  }
+
+  /**
+   * Starts this Operation's execution.
+   *
+   * <p>Called after all successors consuming operations have been started.
+   */
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+    if (started) {
+      return;
+    }
+    started = true;
+    conf = new SerializableConfiguration(taskContext.getConfiguration());
+    for (OutputReceiver receiver : receivers) {
+      if (receiver == null) {
+        continue;
+      }
+      for (Operation operation : receiver.getReceivingOperations()) {
+        operation.start(taskContext);
+      }
+    }
+  }
+
+  /**
+   * Processes the element.
+   */
+  public abstract void process(WindowedValue<T> elem) throws IOException, InterruptedException;
+
+  /**
+   * Finishes this Operation's execution.
+   *
+   * <p>Called after all predecessors producing operations have been finished.
+   */
+  public void finish() {
+    if (finished) {
+      return;
+    }
+    finished = true;
+    for (OutputReceiver receiver : receivers) {
+      if (receiver == null) {
+        continue;
+      }
+      for (Operation operation : receiver.getReceivingOperations()) {
+        operation.finish();
+      }
+    }
+  }
+
+  public SerializableConfiguration getConf() {
+    return conf;
+  }
+
+  public List<OutputReceiver> getOutputReceivers() {
+    // TODO: avoid allocating objects for each output emit.
+    return ImmutableList.copyOf(receivers);
+  }
+
+  /**
+   * Adds an output to this Operation.
+   */
+  public void attachConsumer(TupleTag<?> tupleTag, Operation consumer) {
+    int outputIndex = getOutputIndex(tupleTag);
+    OutputReceiver fanOut = receivers[outputIndex];
+    fanOut.addOutput(consumer);
+  }
+
+  protected int getOutputIndex(TupleTag<?> tupleTag) {
+    return 0;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
new file mode 100644
index 0000000..b2f1b6d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * OutputReceiver that forwards each input it receives to each of a list of down stream operations.
+ */
+public class OutputReceiver implements Serializable {
+  private final List<Operation> receivingOperations = new ArrayList<>();
+
+  /**
+   * Adds a new receiver that this OutputReceiver forwards to.
+   */
+  public void addOutput(Operation receiver) {
+    receivingOperations.add(receiver);
+  }
+
+  public List<Operation> getReceivingOperations() {
+    return ImmutableList.copyOf(receivingOperations);
+  }
+
+  /**
+   * Processes the element.
+   */
+  public void process(WindowedValue<?> elem) {
+    for (Operation out : receivingOperations) {
+      if (out != null) {
+        try {
+          out.process(elem);
+        } catch (IOException | InterruptedException e) {
+          Throwables.throwIfUnchecked(e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
new file mode 100644
index 0000000..ef83e72
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Operation for ParDo.
+ */
+public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> {
+  private final String stepName;
+  protected final SerializedPipelineOptions options;
+  protected final TupleTag<OutputT> mainOutputTag;
+  private final List<TupleTag<?>> sideOutputTags;
+  protected final WindowingStrategy<?, ?> windowingStrategy;
+  private final List<Graphs.Tag> sideInputTags;
+  private Map<TupleTag<?>, String> tupleTagToFilePath;
+
+  private MetricsReporter metricsReporter;
+  protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+  private DoFnRunner<InputT, OutputT> fnRunner;
+
+  public ParDoOperation(
+      String stepName,
+      PipelineOptions options,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      List<Graphs.Tag> sideInputTags,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    super(1 + sideOutputTags.size());
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.options = new SerializedPipelineOptions(checkNotNull(options, "options"));
+    this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
+    this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
+    this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
+    this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags");
+  }
+
+  /**
+   * Returns a {@link DoFn} for processing inputs.
+   */
+  abstract DoFn<InputT, OutputT> getDoFn();
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+    super.start(taskContext);
+    this.metricsReporter = new MetricsReporter(taskContext);
+
+    DoFn<InputT, OutputT> doFn = getDoFn();
+    // Process user's setup
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
+    Map<TupleTag<?>, Coder<?>> tupleTagToCoder = Maps.newHashMap();
+    for (Graphs.Tag tag : sideInputTags) {
+      tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
+    }
+
+    final StateInternals stateInternals;
+    try {
+      stateInternals = InMemoryStateInternals.forKey(taskContext.getCurrentKey());
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException(e);
+    }
+    final TimerInternals timerInternals = new InMemoryTimerInternals();
+
+    fnRunner = DoFnRunners.simpleRunner(
+        options.getPipelineOptions(),
+        getDoFn(),
+        sideInputTags.isEmpty()
+            ? NullSideInputReader.empty() :
+            new FileSideInputReader(tupleTagToFilePath, tupleTagToCoder, getConf().getConf()),
+        createOutputManager(),
+        mainOutputTag,
+        sideOutputTags,
+        new StepContext() {
+          @Override
+          public StateInternals stateInternals() {
+            return stateInternals;
+          }
+
+          @Override
+          public TimerInternals timerInternals() {
+            return timerInternals;
+          }
+        },
+        windowingStrategy);
+
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      fnRunner.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Processes the element.
+   */
+  @Override
+  public void process(WindowedValue<InputT> elem) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      fnRunner.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void finish() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      fnRunner.finishBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    metricsReporter.updateMetrics();
+    doFnInvoker.invokeTeardown();
+    super.finish();
+  }
+
+  public void setupSideInput(Map<TupleTag<?>, String> tupleTagToFilePath) {
+    this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, "tupleTagToFilePath");
+  }
+
+  public List<Graphs.Tag> getSideInputTags() {
+    return sideInputTags;
+  }
+
+  @Override
+  protected int getOutputIndex(TupleTag<?> tupleTag) {
+    if (tupleTag == mainOutputTag) {
+      return 0;
+    } else {
+      int sideIndex = sideOutputTags.indexOf(tupleTag);
+      checkState(
+          sideIndex >= 0,
+          String.format("Cannot find index for tuple tag: %s.", tupleTag));
+      return sideIndex + 1;
+    }
+  }
+
+  protected DoFnRunners.OutputManager createOutputManager() {
+    return new ParDoOutputManager();
+  }
+
+  private class ParDoOutputManager implements DoFnRunners.OutputManager {
+
+    @Nullable
+    private OutputReceiver getReceiverOrNull(TupleTag<?> tupleTag) {
+      List<OutputReceiver> receivers = getOutputReceivers();
+      int outputIndex = getOutputIndex(tupleTag);
+      return receivers.get(outputIndex);
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
+      OutputReceiver receiver = getReceiverOrNull(tupleTag);
+      if (receiver != null) {
+        receiver.process(windowedValue);
+      }
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
new file mode 100644
index 0000000..f8f1a02
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates a {@link ParDo} to a {@link Operation}.
+ */
+class ParDoTranslator<InputT, OutputT>
+    extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+  @Override
+  public void translateNode(
+      ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    NormalParDoOperation operation = new NormalParDoOperation(
+        userGraphContext.getStepName(),
+        transform.getFn(),
+        userGraphContext.getOptions(),
+        transform.getMainOutputTag(),
+        transform.getAdditionalOutputTags().getAll(),
+        userGraphContext.getSideInputTags(),
+        ((PCollection) userGraphContext.getInput()).getWindowingStrategy());
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
new file mode 100644
index 0000000..dc0f81a
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Operation that partitions input elements based on their {@link TupleTag} keys.
+ */
+public class PartitionOperation extends Operation<KV<TupleTag<?>, Object>> {
+
+  private final List<ReadOperation> readOperations;
+  private final List<TupleTag<?>> tupleTags;
+
+  public PartitionOperation(List<ReadOperation> readOperations, List<TupleTag<?>> tupleTags) {
+    super(readOperations.size());
+    this.readOperations = checkNotNull(readOperations, "readOperations");
+    this.tupleTags = checkNotNull(tupleTags, "tupleTags");
+  }
+
+  public List<ReadOperation> getReadOperations() {
+    return readOperations;
+  }
+
+  @Override
+  public void process(WindowedValue<KV<TupleTag<?>, Object>> elem) throws IOException,
+      InterruptedException {
+    TupleTag<?> tupleTag = elem.getValue().getKey();
+    int outputIndex = getOutputIndex(tupleTag);
+    OutputReceiver receiver = getOutputReceivers().get(outputIndex);
+    receiver.process((WindowedValue<?>) elem.getValue().getValue());
+  }
+
+  @Override
+  protected int getOutputIndex(TupleTag<?> tupleTag) {
+    int index = tupleTags.indexOf(tupleTag);
+    checkState(
+        index >= 0,
+        String.format("Cannot find index for tuple tag: %s.", tupleTag));
+    return index;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
new file mode 100644
index 0000000..5e5c99b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.io.Read;
+
+/**
+ * Translates a {@link Read.Bounded} to a {@link ReadOperation}.
+ */
+class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+  @Override
+  public void translateNode(Read.Bounded transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    String stepName = userGraphContext.getStepName();
+    ReadOperation operation = new SourceReadOperation(
+        stepName, transform.getSource(), userGraphContext.getOnlyOutputTag());
+    context.addInitStep(
+        Graphs.Step.of(stepName, operation),
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
new file mode 100644
index 0000000..a3e1d77
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Read.Bounded place holder {@link Operation} during pipeline translation.
+ */
+abstract class ReadOperation<T> extends Operation<T> {
+
+  public ReadOperation() {
+    super(1);
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    throw new IllegalStateException(
+        String.format("%s should not in execution graph.", this.getClass().getSimpleName()));
+  }
+
+  /**
+   * Returns a TaggedSource during pipeline construction time.
+   */
+  abstract TaggedSource getTaggedSource(Configuration conf);
+
+  @AutoValue
+  abstract static class TaggedSource implements Serializable {
+    abstract String getStepName();
+    abstract BoundedSource<?> getSource();
+    abstract TupleTag<?> getTag();
+
+    static TaggedSource of(String stepName, BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
+      return new org.apache.beam.runners.mapreduce.translation
+          .AutoValue_ReadOperation_TaggedSource(stepName, boundedSource, tupleTag);
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
new file mode 100644
index 0000000..0e02bbb
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * {@link Operation} that executes {@link ReifyTimestampAndWindowsDoFn}.
+ */
+public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation {
+
+  public ReifyTimestampAndWindowsParDoOperation(
+      String stepName,
+      PipelineOptions options,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Graphs.Tag outTag) {
+    super(stepName, options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+        ImmutableList.<Graphs.Tag>of(), windowingStrategy);
+  }
+
+  @Override
+  DoFn<Object, Object> getDoFn() {
+    return (DoFn) new ReifyTimestampAndWindowsDoFn<>();
+  }
+
+  private static class ReifyTimestampAndWindowsDoFn<K, V>
+      extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      KV<K, V> kv = c.element();
+      K key = kv.getKey();
+      V value = kv.getValue();
+      c.output(KV.of(
+          key,
+          WindowedValue.of(
+              value,
+              c.timestamp(),
+              window,
+              c.pane())));
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
new file mode 100644
index 0000000..7af595c
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@link Serializable} {@link Configuration}.
+ */
+class SerializableConfiguration implements Serializable {
+
+  private transient Configuration conf;
+
+  SerializableConfiguration(Configuration conf) {
+    this.conf = checkNotNull(conf, "conf");
+  }
+
+  Configuration getConf() {
+    return conf;
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.defaultWriteObject();
+    conf.write(out);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException {
+    conf = new Configuration();
+    conf.readFields(in);
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
new file mode 100644
index 0000000..5c37b7c
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private final byte[] serializedOptions;
+
+  /** Lazily initialized copy of deserialized options. */
+  private transient PipelineOptions pipelineOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+    checkNotNull(options, "PipelineOptions must not be null.");
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      createMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
+
+        FileSystems.setDefaultPipelineOptions(pipelineOptions);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+      }
+    }
+
+    return pipelineOptions;
+  }
+
+  /**
+   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+   * for user specified configuration injection into the ObjectMapper. This supports user custom
+   * types on {@link PipelineOptions}.
+   */
+  private static ObjectMapper createMapper() {
+    return new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
new file mode 100644
index 0000000..a8fae1b
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * {@link Operation} that materializes input for group by key.
+ */
+public class ShuffleWriteOperation<T> extends Operation<T> {
+
+  private final Coder<Object> keyCoder;
+  private final Coder<Object> valueCoder;
+
+  private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+
+  public ShuffleWriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) {
+    super(0);
+    this.keyCoder = checkNotNull(keyCoder, "keyCoder");
+    this.valueCoder = checkNotNull(valueCoder, "valueCoder");
+  }
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+    this.taskContext = checkNotNull(taskContext, "taskContext");
+  }
+
+  @Override
+  public void process(WindowedValue<T> elem) throws IOException, InterruptedException {
+    KV<?, ?> kv = (KV<?, ?>) elem.getValue();
+    ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
+    keyCoder.encode(kv.getKey(), keyStream);
+
+    ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+    valueCoder.encode(kv.getValue(), valueStream);
+    taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
new file mode 100644
index 0000000..55b46a4
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Operation that reads from {@link BoundedSource}.
+ */
+public class SourceReadOperation extends ReadOperation {
+  private final String stepName;
+  private final TaggedSource source;
+
+  SourceReadOperation(String stepName, BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
+    this.stepName = checkNotNull(stepName, "stepName");
+    checkNotNull(boundedSource, "boundedSource");
+    checkNotNull(tupleTag, "tupleTag");
+    this.source = TaggedSource.of(stepName, boundedSource, tupleTag);
+  }
+
+  @Override
+  TaggedSource getTaggedSource(Configuration conf) {
+    return source;
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
new file mode 100644
index 0000000..f495372
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> {
+
+  void translateNode(T transform, TranslationContext context);
+
+  /**
+   * Returns true if this translator can translate the given transform.
+   */
+  boolean canTranslate(T transform, TranslationContext context);
+
+  /**
+   * Default translator.
+   */
+  class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+    @Override
+    public void translateNode(T1 transform, TranslationContext context) {
+
+    }
+
+    @Override
+    public boolean canTranslate(T1 transform, TranslationContext context) {
+      return true;
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
new file mode 100644
index 0000000..e908e93
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Class that maintains contexts during translation.
+ */
+public class TranslationContext {
+
+  private final UserGraphContext userGraphContext;
+  private final Graph<Graphs.Step, Graphs.Tag> initGraph;
+
+  public TranslationContext(MapReducePipelineOptions options) {
+    this.userGraphContext = new UserGraphContext(options);
+    this.initGraph = new Graph<>();
+  }
+
+  public UserGraphContext getUserGraphContext() {
+    return userGraphContext;
+  }
+
+  public void addInitStep(Graphs.Step step, List<Graphs.Tag> inTags, List<Graphs.Tag> outTags) {
+    initGraph.addStep(step, inTags, outTags);
+  }
+
+  /**
+   * Returns {@link Graphs.Step steps} in reverse topological order.
+   */
+  public Graph<Graphs.Step, Graphs.Tag> getInitGraph() {
+    return initGraph;
+  }
+
+  /**
+   * Context of user graph.
+   */
+  public static class UserGraphContext {
+    private final MapReducePipelineOptions options;
+    private final Map<PValue, TupleTag<?>> pValueToTupleTag;
+    private TransformHierarchy.Node currentNode;
+
+    public UserGraphContext(MapReducePipelineOptions options) {
+      this.options = checkNotNull(options, "options");
+      this.pValueToTupleTag = Maps.newHashMap();
+      this.currentNode = null;
+    }
+
+    public MapReducePipelineOptions getOptions() {
+      return options;
+    }
+
+    public void setCurrentNode(TransformHierarchy.Node node) {
+      this.currentNode = node;
+      for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet()) {
+        pValueToTupleTag.put(entry.getValue(), entry.getKey());
+        // TODO: this is a hack to get around that ViewAsXYZ.expand() return wrong output PValue.
+        if (node.getTransform() instanceof View.CreatePCollectionView) {
+          View.CreatePCollectionView view = (View.CreatePCollectionView) node.getTransform();
+          pValueToTupleTag.put(view.getView(), view.getView().getTagInternal());
+        }
+      }
+    }
+
+    public String getStepName() {
+      return currentNode.getFullName();
+    }
+
+    public PValue getInput() {
+      return Iterables.get(currentNode.getInputs().values(), 0);
+    }
+
+    public PValue getOutput() {
+      return Iterables.get(currentNode.getOutputs().values(), 0);
+    }
+
+    public List<Graphs.Tag> getInputTags() {
+      Iterable<PValue> inputs;
+      if (currentNode.getTransform() instanceof ParDo.MultiOutput) {
+        ParDo.MultiOutput parDo = (ParDo.MultiOutput) currentNode.getTransform();
+        inputs = ImmutableList.<PValue>builder()
+            .add(getInput()).addAll(parDo.getSideInputs()).build();
+      } else {
+        inputs = currentNode.getInputs().values();
+      }
+      return FluentIterable.from(inputs)
+          .transform(new Function<PValue, Graphs.Tag>() {
+            @Override
+            public Graphs.Tag apply(PValue pValue) {
+              checkState(
+                  pValueToTupleTag.containsKey(pValue),
+                  String.format("Failed to find TupleTag for pValue: %s.", pValue));
+              if (pValue instanceof PCollection) {
+                PCollection<?> pc = (PCollection<?>) pValue;
+                return Graphs.Tag.of(
+                    pc.getName(),
+                    pValueToTupleTag.get(pValue),
+                    pc.getCoder(),
+                    pc.getWindowingStrategy());
+              } else if (pValue instanceof PCollectionView){
+                PCollectionView pView = (PCollectionView) pValue;
+                return Graphs.Tag.of(
+                    pValue.getName(),
+                    pValueToTupleTag.get(pValue),
+                    pView.getCoderInternal(),
+                    pView.getWindowingStrategyInternal());
+              } else {
+                throw new RuntimeException("Unexpected PValue: " + pValue.getClass());
+              }
+            }})
+          .toList();
+    }
+
+    public List<Graphs.Tag> getSideInputTags() {
+      if (!(currentNode.getTransform() instanceof ParDo.MultiOutput)) {
+        return ImmutableList.of();
+      }
+      return FluentIterable.from(((ParDo.MultiOutput) currentNode.getTransform()).getSideInputs())
+          .transform(new Function<PValue, Graphs.Tag>() {
+            @Override
+            public Graphs.Tag apply(PValue pValue) {
+              checkState(
+                  pValueToTupleTag.containsKey(pValue),
+                  String.format("Failed to find TupleTag for pValue: %s.", pValue));
+              if (pValue instanceof PCollection) {
+                PCollection<?> pc = (PCollection<?>) pValue;
+                return Graphs.Tag.of(
+                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(),
+                    pc.getWindowingStrategy());
+              } else if (pValue instanceof PCollectionView){
+                PCollectionView pView = (PCollectionView) pValue;
+                return Graphs.Tag.of(
+                    pValue.getName(),
+                    pValueToTupleTag.get(pValue),
+                    pView.getCoderInternal(),
+                    pView.getWindowingStrategyInternal());
+              } else {
+                throw new RuntimeException("Unexpected PValue: " + pValue.getClass());
+              }
+            }})
+          .toList();
+    }
+
+    public List<Graphs.Tag> getOutputTags() {
+      if (currentNode.getTransform() instanceof View.CreatePCollectionView) {
+        PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView();
+        return ImmutableList.of(
+            Graphs.Tag.of(view.getName(), view.getTagInternal(), view.getCoderInternal(),
+                view.getWindowingStrategyInternal()));
+      } else {
+        return FluentIterable.from(currentNode.getOutputs().entrySet())
+            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() {
+              @Override
+              public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
+                PCollection<?> pc = (PCollection<?>) entry.getValue();
+                return Graphs.Tag.of(
+                    pc.getName(), entry.getKey(), pc.getCoder(), pc.getWindowingStrategy());
+              }})
+            .toList();
+      }
+    }
+
+    public TupleTag<?> getOnlyOutputTag() {
+      return Iterables.getOnlyElement(currentNode.getOutputs().keySet());
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..e51d392
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+  private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =
+      new HashMap<>();
+
+  static {
+    TRANSLATORS.put(Read.Bounded.class, new ReadBoundedTranslator());
+    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslator());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+    TRANSLATORS.put(View.CreatePCollectionView.class, new ViewTranslator());
+  }
+
+  public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+    return TRANSLATORS.get(transform.getClass());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
new file mode 100644
index 0000000..d018345
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link View.CreatePCollectionView} to a {@link FileWriteOperation}.
+ */
+public class ViewTranslator extends TransformTranslator.Default<View.CreatePCollectionView<?, ?>> {
+
+  @Override
+  public void translateNode(
+      View.CreatePCollectionView<?, ?> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    PCollection<?> inPCollection = transform.getView().getPCollection();
+    WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy();
+
+    Graphs.Tag outTag = Iterables.getOnlyElement(userGraphContext.getOutputTags());
+    String fileName = ConfigurationUtils.toFileName(outTag.getName());
+
+    FileWriteOperation<?> operation = new FileWriteOperation<>(
+        fileName,
+        WindowedValue.getFullCoder(
+            inPCollection.getCoder(), windowingStrategy.getWindowFn().windowCoder()));
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
new file mode 100644
index 0000000..3279e11
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * {@link Operation} that executes for assigning windows to elements.
+ */
+public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation<T> {
+  private final WindowFn<T, W> windowFn;
+
+  public WindowAssignOperation(WindowFn<T, W> windowFn) {
+    super(1);
+    this.windowFn = checkNotNull(windowFn, "windowFn");
+  }
+
+  @Override
+  public void process(WindowedValue<T> elem) {
+    try {
+      Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, elem));
+      for (W window : windows) {
+        OutputReceiver receiver = Iterables.getOnlyElement(getOutputReceivers());
+        receiver.process(WindowedValue.of(
+            elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+      }
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private class AssignContextInternal<InputT, W extends BoundedWindow>
+      extends WindowFn<InputT, W>.AssignContext {
+    private final WindowedValue<InputT> value;
+
+    AssignContextInternal(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+      fn.super();
+      checkArgument(
+          Iterables.size(value.getWindows()) == 1,
+          String.format(
+              "%s passed to window assignment must be in a single window, but it was in %s: %s",
+              WindowedValue.class.getSimpleName(),
+              Iterables.size(value.getWindows()),
+              value.getWindows()));
+      this.value = value;
+    }
+
+    @Override
+    public InputT element() {
+      return value.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return value.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
+    }
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..3908870
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Translates a {@link Window.Assign} to a {@link WindowAssignOperation}.
+ */
+public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+  @Override
+  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    WindowAssignOperation<T, ?> operation = new WindowAssignOperation<>(transform.getWindowFn());
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags());
+  }
+}
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
new file mode 100644
index 0000000..c9360ac
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for translating Beam pipelines to MapReduce jobs.
+ */
+package org.apache.beam.runners.mapreduce.translation;
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
new file mode 100644
index 0000000..76c8311
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GraphConverter}.
+ */
+@RunWith(JUnit4.class)
+public class GraphConverterTest {
+
+  @Test
+  public void testCombine() throws Exception {
+    MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+    options.setRunner(CrashingRunner.class);
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, Integer>> input = p
+        .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Sum.<String>integersPerKey());
+    TranslationContext context = new TranslationContext(options);
+    GraphConverter graphConverter = new GraphConverter(context);
+    p.traverseTopologically(graphConverter);
+
+    Graph<Graphs.Step, Graphs.Tag> initGraph = context.getInitGraph();
+
+    assertEquals(3, Iterables.size(initGraph.getSteps()));
+  }
+}
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
new file mode 100644
index 0000000..fca6131
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GraphPlanner}.
+ */
+@RunWith(JUnit4.class)
+public class GraphPlannerTest {
+
+  @Test
+  public void testCombine() throws Exception {
+    MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+    options.setRunner(CrashingRunner.class);
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, Integer>> input = p
+        .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Sum.<String>integersPerKey());
+
+    TranslationContext context = new TranslationContext(options);
+    GraphConverter graphConverter = new GraphConverter(context);
+    p.traverseTopologically(graphConverter);
+
+    GraphPlanner planner = new GraphPlanner(options);
+    Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph());
+    fusedGraph = planner.plan(fusedGraph);
+
+    assertEquals(1, Iterables.size(fusedGraph.getFusedSteps()));
+    assertEquals(3, Iterables.getOnlyElement(fusedGraph.getFusedSteps()).getSteps().size());
+  }
+}
diff --git a/runners/pom.xml b/runners/pom.xml
index 4412ed6..39a9811 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   <modules>
     <module>core-construction-java</module>
     <module>core-java</module>
+    <module>map-reduce</module>
     <module>direct-java</module>
     <module>flink</module>
     <module>google-cloud-dataflow-java</module>
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 3144193..75b2043 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -37,7 +37,8 @@
          and other project configuration to be used in all modules.
     <module>build-tools</module> -->
     <module>core</module>
-    <module>io</module>
+    <!--module>io</module-->
+    <module>io/hadoop-file-system</module>
     <module>maven-archetypes</module>
     <module>extensions</module>
     <!-- javadoc runs directly from the root parent as the last module
diff --git a/sdks/pom.xml b/sdks/pom.xml
index aec8762..c06f764 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -35,7 +35,7 @@
   <modules>
     <module>common</module>
     <module>java</module>
-    <module>python</module>
+    <!--<module>python</module>-->
   </modules>
 
   <profiles>