Merge master
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
index b415cf8..23e8625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
@@ -27,6 +27,8 @@
  */
 @InterfaceStability.Unstable
 public interface StreamGraphBuilder {
+  static final String BUILDER_CLASS_CONFIG = "job.graph.builder.class";
+
   /**
    * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
    * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 32ab47a..f81266b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -27,12 +27,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction0;
 import scala.runtime.AbstractFunction1;
 
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.samza.util.Util.asScalaClock;
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
 
 /**
  * Factory class to create runloop for a Samza task, based on the type
@@ -109,18 +109,4 @@
     }
   }
 
-  /**
-   * Returns a default value object for scala option.getOrDefault() to use
-   * @param value default value
-   * @param <T> value type
-   * @return object containing default value
-   */
-  public static <T> AbstractFunction0<T> defaultValue(final T value) {
-    return new AbstractFunction0<T>() {
-      @Override
-      public T apply() {
-        return value;
-      }
-    };
-  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 830e4a5..77cb3fc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -163,10 +163,12 @@
 
   @Override
   public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
-    MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
+    int opId = graph.getNextOpId();
+    String streamId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name(), opId);
+    MessageStreamImpl<M> intStream = this.graph.createIntStream(streamId, parKeyExtractor);
     OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
     this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
-        this.graph, outputStream));
+        this.graph, outputStream, opId));
     return intStream;
   }
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 8ca8157..f3fd176 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,20 +18,20 @@
  */
 package org.apache.samza.operators;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Function;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.ExecutionEnvironment;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
  * create system input/output/intermediate streams.
@@ -129,9 +129,14 @@
    */
   private final Map<String, MessageStream> inStreams = new HashMap<>();
   private final Map<String, OutputStream> outStreams = new HashMap<>();
+  private final ExecutionEnvironment executionEnvironment;
 
   private ContextManager contextManager = new ContextManager() { };
 
+  public StreamGraphImpl(ExecutionEnvironment executionEnvironment) {
+    this.executionEnvironment = executionEnvironment;
+  }
+
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
     if (!this.inStreams.containsKey(streamSpec.getId())) {
@@ -182,7 +187,12 @@
 
   @Override public Map<StreamSpec, OutputStream> getOutStreams() {
     Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
-    this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+    this.outStreams.forEach((ss, entry) -> {
+        StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ?
+          ((IntermediateStreamImpl) entry).getSpec() :
+          ((OutputStreamImpl) entry).getSpec();
+        outStreamMap.put(streamSpec, entry);
+      });
     return Collections.unmodifiableMap(outStreamMap);
   }
 
@@ -208,8 +218,8 @@
    */
   public MessageStreamImpl getInputStream(SystemStream sstream) {
     for (MessageStream entry: this.inStreams.values()) {
-      if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
-          ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+      if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) &&
+          ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) {
         return (MessageStreamImpl) entry;
       }
     }
@@ -231,9 +241,9 @@
    * @param <M>  the type of input message
    * @return  the {@link OutputStream} object for the re-partitioned stream
    */
-  <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+  <PK, M> MessageStreamImpl<M> createIntStream(String streamId, Function<M, PK> parKeyFn) {
     // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
-    StreamSpec streamSpec = this.createIntStreamSpec();
+    StreamSpec streamSpec = executionEnvironment.streamFromConfig(streamId);
 
     if (!this.inStreams.containsKey(streamSpec.getId())) {
       this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
@@ -244,10 +254,4 @@
     }
     return intStream;
   }
-
-  private StreamSpec createIntStreamSpec() {
-    // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
-    return null;
-  }
-
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index d626852..7b14b9c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -152,8 +152,8 @@
    * @param <M>  type of input message
    * @return  the {@link SinkOperatorSpec}
    */
-  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
+  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream, int opId) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
new file mode 100644
index 0000000..757034e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
+ * the intermediate topics needed for the execution.
+ */
+public class ExecutionPlanner {
+  private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
+
+  private final Config config;
+
+  public ExecutionPlanner(Config config) {
+    this.config = config;
+  }
+
+  public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
+    Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
+
+    // create physical processors based on stream graph
+    ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
+
+    if (!processorGraph.getInternalStreams().isEmpty()) {
+      // figure out the partitions for internal streams
+      calculatePartitions(streamGraph, processorGraph, sysAdmins);
+
+      // create the streams
+      createStreams(processorGraph, sysAdmins);
+    }
+
+    return processorGraph;
+  }
+
+  /**
+   * Create the physical graph from StreamGraph
+   * Package private for testing
+   */
+  ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
+    // For this phase, we are going to create a processor for the whole dag
+    String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
+
+    ProcessorGraph processorGraph = new ProcessorGraph(config);
+    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
+    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
+    Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
+    intStreams.retainAll(sinkStreams);
+    sourceStreams.removeAll(intStreams);
+    sinkStreams.removeAll(intStreams);
+
+    // add sources
+    sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+
+    // add sinks
+    sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+
+    // add intermediate streams
+    intStreams.forEach(spec -> processorGraph.addEdge(spec, processorId, processorId));
+
+    processorGraph.validate();
+
+    return processorGraph;
+  }
+
+  /**
+   * Figure out the number of partitions of intermediate streams
+   * Package private for testing
+   */
+  void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+    // fetch the external streams partition info
+    fetchExistingStreamPartitions(processorGraph, sysAdmins);
+
+    // calculate the partitions for the input streams of join operators
+    calculateJoinInputPartitions(streamGraph, processorGraph);
+
+    // calculate the partitions for the rest of intermediate streams
+    calculateIntStreamPartitions(processorGraph, config);
+  }
+
+  static void fetchExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+    Set<StreamEdge> existingStreams = new HashSet<>();
+    existingStreams.addAll(processorGraph.getSources());
+    existingStreams.addAll(processorGraph.getSinks());
+
+    Multimap<String, StreamEdge> existingStreamsMap = HashMultimap.create();
+    existingStreams.forEach(streamEdge -> {
+        SystemStream systemStream = streamEdge.getSystemStream();
+        existingStreamsMap.put(systemStream.getSystem(), streamEdge);
+      });
+    for (Map.Entry<String, Collection<StreamEdge>> entry : existingStreamsMap.asMap().entrySet()) {
+      String systemName = entry.getKey();
+      Collection<StreamEdge> streamEdges = entry.getValue();
+      Map<String, StreamEdge> streamToEdge = new HashMap<>();
+      streamEdges.forEach(streamEdge -> streamToEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
+      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+      Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet());
+      metadata.forEach((stream, data) -> {
+          int partitions = data.getSystemStreamPartitionMetadata().size();
+          streamToEdge.get(stream).setPartitions(partitions);
+          log.debug("Partition count is {} for stream {}", partitions, stream);
+        });
+    }
+  }
+
+  /**
+   * Calculate the partitions for the input streams of join operators
+   * Package private for testing
+   */
+  static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+    // get join operators with input streams
+    Multimap<OperatorSpec, StreamEdge> joinToStreamMap = HashMultimap.create();
+    Multimap<StreamEdge, OperatorSpec> streamToJoinMap = HashMultimap.create();
+    Map<MessageStream, OperatorSpec> outputToJoinMap = new HashMap<>();
+    Queue<OperatorSpec> joinQ = new LinkedList<>(); // a queue of joins with known input partitions
+    Set<OperatorSpec> visited = new HashSet<>();
+    streamGraph.getInStreams().entrySet().forEach(entry -> {
+        StreamEdge streamEdge = processorGraph.getEdge(entry.getKey());
+        getJoins(entry.getValue(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+      });
+    // calculate join input partition count
+    while (!joinQ.isEmpty()) {
+      OperatorSpec join = joinQ.poll();
+      int partitions = -1;
+      // loop through the input streams to the join and find the partition count
+      for (StreamEdge edge : joinToStreamMap.get(join)) {
+        int edgePartitions = edge.getPartitions();
+        if (edgePartitions != -1) {
+          if (partitions == -1) {
+            //if the partition is not assigned
+            partitions = edgePartitions;
+          } else if (partitions != edgePartitions) {
+            throw  new SamzaException(String.format("Unable to resolve input partitions of stream %s for join",
+                edge.getSystemStream().toString()));
+          }
+        }
+      }
+      // assign the partition count
+      for (StreamEdge edge : joinToStreamMap.get(join)) {
+        if (edge.getPartitions() <= 0) {
+          edge.setPartitions(partitions);
+
+          // find other joins can be inferred by setting this edge
+          for (OperatorSpec op : streamToJoinMap.get(edge)) {
+            if (!visited.contains(op)) {
+              joinQ.add(op);
+              visited.add(op);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This function
+   * @param messageStream
+   * @param streamEdge
+   * @param joinToStreamMap
+   * @param streamToJoinMap
+   * @param outputToJoinMap
+   * @param joinQ
+   * @param visited
+   */
+  static void getJoins(MessageStream messageStream,
+      StreamEdge streamEdge,
+      Multimap<OperatorSpec, StreamEdge> joinToStreamMap,
+      Multimap<StreamEdge, OperatorSpec> streamToJoinMap,
+      Map<MessageStream, OperatorSpec> outputToJoinMap,
+      Queue<OperatorSpec> joinQ,
+      Set<OperatorSpec> visited) {
+    Collection<OperatorSpec> specs = ((MessageStreamImpl) messageStream).getRegisteredOperatorSpecs();
+    for (OperatorSpec spec : specs) {
+      if (spec instanceof PartialJoinOperatorSpec) {
+        // every join will have two partial join operators
+        // we will choose one of them in order to consolidate the inputs
+        // the first one who registered with the outputToJoinMap will win
+        MessageStream output = spec.getNextStream();
+        OperatorSpec joinSpec = outputToJoinMap.get(output);
+        if (joinSpec == null) {
+          joinSpec = spec;
+          outputToJoinMap.put(output, joinSpec);
+        }
+
+        joinToStreamMap.put(joinSpec, streamEdge);
+        streamToJoinMap.put(streamEdge, joinSpec);
+
+        if (!visited.contains(joinSpec) && streamEdge.getPartitions() > 0) {
+          // put the joins with known input partitions into the queue
+          joinQ.add(joinSpec);
+          visited.add(joinSpec);
+        }
+      }
+
+      if (spec.getNextStream() != null) {
+        getJoins(spec.getNextStream(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+      }
+    }
+  }
+
+  static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+    int partitions = config.getInt(JobConfig.JOB_DEFAULT_PARTITIONS(), -1);
+    if (partitions < 0) {
+      // use the following simple algo to figure out the partitions
+      // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+      int maxInPartitions = maxPartition(processorGraph.getSources());
+      int maxOutPartitions = maxPartition(processorGraph.getSinks());
+      partitions = Math.max(maxInPartitions, maxOutPartitions);
+    }
+    for (StreamEdge edge : processorGraph.getInternalStreams()) {
+      if (edge.getPartitions() <= 0) {
+        edge.setPartitions(partitions);
+      }
+    }
+  }
+
+  private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) {
+    Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create();
+    graph.getInternalStreams().forEach(edge -> {
+        StreamSpec streamSpec = createStreamSpec(edge);
+        streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec);
+      });
+
+    for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) {
+      String systemName = entry.getKey();
+      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+
+      for (StreamSpec stream : entry.getValue()) {
+        log.info("Creating stream {} with partitions {} on system {}",
+            new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
+        systemAdmin.createStream(stream);
+      }
+    }
+  }
+
+  private static int maxPartition(Collection<StreamEdge> edges) {
+    return edges.stream().map(StreamEdge::getPartitions).reduce(Integer::max).get();
+  }
+
+  private static StreamSpec createStreamSpec(StreamEdge edge) {
+    StreamSpec orgSpec = edge.getStreamSpec();
+    return orgSpec.copyWithPartitionCount(edge.getPartitions());
+  }
+
+  private static Map<String, SystemAdmin> getSystemAdmins(Config config) {
+    return getSystemFactories(config).entrySet()
+        .stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().getAdmin(entry.getKey(), config)));
+  }
+
+  private static Map<String, SystemFactory> getSystemFactories(Config config) {
+    Map<String, SystemFactory> systemFactories =
+        getSystemNames(config).stream().collect(Collectors.toMap(systemName -> systemName, systemName -> {
+            String systemFactoryClassName = new JavaSystemConfig(config).getSystemFactory(systemName);
+            if (systemFactoryClassName == null) {
+              throw new SamzaException(
+                  String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+            }
+            return Util.getObj(systemFactoryClassName);
+          }));
+
+    return systemFactories;
+  }
+
+  private static Collection<String> getSystemNames(Config config) {
+    return new JavaSystemConfig(config).getSystemNames();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
new file mode 100644
index 0000000..5ee4d29
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of execution processors connected by source/sink/intermediate streams.
+ * High level APIs are transformed into ProcessorGraph for planing, validation and execution.
+ */
+public class ProcessorGraph {
+  private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
+
+  private final Map<String, ProcessorNode> nodes = new HashMap<>();
+  private final Map<String, StreamEdge> edges = new HashMap<>();
+  private final Set<StreamEdge> sources = new HashSet<>();
+  private final Set<StreamEdge> sinks = new HashSet<>();
+  private final Set<StreamEdge> internalStreams = new HashSet<>();
+  private final Config config;
+
+  ProcessorGraph(Config config) {
+    this.config = config;
+  }
+
+  void addSource(StreamSpec input, String targetProcessorId) {
+    ProcessorNode node = getNode(targetProcessorId);
+    StreamEdge edge = getEdge(input);
+    edge.addTargetNode(node);
+    node.addInEdge(edge);
+    sources.add(edge);
+  }
+
+  void addSink(StreamSpec output, String sourceProcessorId) {
+    ProcessorNode node = getNode(sourceProcessorId);
+    StreamEdge edge = getEdge(output);
+    edge.addSourceNode(node);
+    node.addOutEdge(edge);
+    sinks.add(edge);
+  }
+
+  void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
+    ProcessorNode sourceNode = getNode(sourceProcessorId);
+    ProcessorNode targetNode = getNode(targetProcessorId);
+    StreamEdge edge = getEdge(streamSpec);
+    edge.addSourceNode(sourceNode);
+    edge.addTargetNode(targetNode);
+    sourceNode.addOutEdge(edge);
+    targetNode.addInEdge(edge);
+    internalStreams.add(edge);
+  }
+
+  ProcessorNode getNode(String processorId) {
+    ProcessorNode node = nodes.get(processorId);
+    if (node == null) {
+      node = new ProcessorNode(processorId, config);
+      nodes.put(processorId, node);
+    }
+    return node;
+  }
+
+  StreamEdge getEdge(StreamSpec streamSpec) {
+    String streamId = streamSpec.getId();
+    StreamEdge edge = edges.get(streamId);
+    if (edge == null) {
+      edge = new StreamEdge(streamSpec, config);
+      edges.put(streamId, edge);
+    }
+    return edge;
+  }
+
+  /**
+   * Returns the processor with configs to be executed in the topological order
+   * @return list of ProcessorConfig
+   */
+  public List<ProcessorNode> getProcessors() {
+    List<ProcessorNode> sortedNodes = topologicalSort();
+    return Collections.unmodifiableList(sortedNodes);
+  }
+
+  public Set<StreamEdge> getSources() {
+    return Collections.unmodifiableSet(sources);
+  }
+
+  public Set<StreamEdge> getSinks() {
+    return Collections.unmodifiableSet(sinks);
+  }
+
+  public Set<StreamEdge> getInternalStreams() {
+    return Collections.unmodifiableSet(internalStreams);
+  }
+
+
+  /**
+   * Validate the graph
+   */
+  public void validate() {
+    validateSources();
+    validateSinks();
+    validateInternalStreams();
+    validateReachability();
+  }
+
+  /**
+   * Validate the sources should have indegree being 0 and outdegree greater than 0
+   */
+  private void validateSources() {
+    sources.forEach(edge -> {
+        if (!edge.getSourceNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+        }
+        if (edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate the sinks should have outdegree being 0 and indegree greater than 0
+   */
+  private void validateSinks() {
+    sinks.forEach(edge -> {
+        if (!edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+        }
+        if (edge.getSourceNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate the internal streams should have both indegree and outdegree greater than 0
+   */
+  private void validateInternalStreams() {
+    Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
+    internalEdges.removeAll(sources);
+    internalEdges.removeAll(sinks);
+
+    internalEdges.forEach(edge -> {
+        if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate all nodes are reachable by sources.
+   */
+  private void validateReachability() {
+    // validate all nodes are reachable from the sources
+    final Set<ProcessorNode> reachable = findReachable();
+    if (reachable.size() != nodes.size()) {
+      Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
+      unreachable.removeAll(reachable);
+      throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
+          String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
+    }
+  }
+
+  /**
+   * Find the reachable set of nodes using BFS.
+   * Package private for test.
+   * @return reachable set of nodes
+   */
+  Set<ProcessorNode> findReachable() {
+    Queue<ProcessorNode> queue = new ArrayDeque<>();
+    Set<ProcessorNode> visited = new HashSet<>();
+
+    sources.forEach(source -> {
+        List<ProcessorNode> next = source.getTargetNodes();
+        queue.addAll(next);
+        visited.addAll(next);
+      });
+
+    while (!queue.isEmpty()) {
+      ProcessorNode node = queue.poll();
+      node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+          if (!visited.contains(target)) {
+            visited.add(target);
+            queue.offer(target);
+          }
+        });
+    }
+
+    return visited;
+  }
+
+  /**
+   * An variation of Kahn's algorithm of topological sorting.
+   * This algorithm also takes account of the simple loops in the graph
+   * Package private for test.
+   * @return topologically sorted ProcessorNode(s)
+   */
+  List<ProcessorNode> topologicalSort() {
+    Collection<ProcessorNode> pnodes = nodes.values();
+    Queue<ProcessorNode> q = new ArrayDeque<>();
+    Map<String, Long> indegree = new HashMap<>();
+    Set<ProcessorNode> visited = new HashSet<>();
+    pnodes.forEach(node -> {
+        String nid = node.getId();
+        //only count the degrees of intermediate streams since sources have degree 0
+        long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+        indegree.put(nid, degree);
+
+        if (degree == 0L) {
+          // start from the nodes that only consume from sources
+          q.add(node);
+          visited.add(node);
+        }
+      });
+
+    List<ProcessorNode> sortedNodes = new ArrayList<>();
+    Set<ProcessorNode> reachable = new HashSet<>();
+    while (sortedNodes.size() < pnodes.size()) {
+      while (!q.isEmpty()) {
+        ProcessorNode node = q.poll();
+        sortedNodes.add(node);
+        node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+            String nid = n.getId();
+            Long degree = indegree.get(nid) - 1;
+            indegree.put(nid, degree);
+            if (degree == 0L && !visited.contains(n)) {
+              q.add(n);
+              visited.add(n);
+            }
+            reachable.add(n);
+          });
+      }
+
+      if (sortedNodes.size() < pnodes.size()) {
+        // The remaining nodes have circles
+        // use the following approach to break the circles
+        // start from the nodes that are reachable from previous traverse
+        reachable.removeAll(sortedNodes);
+        if (!reachable.isEmpty()) {
+          //find out the nodes with minimal input edge
+          long min = Long.MAX_VALUE;
+          ProcessorNode minNode = null;
+          for (ProcessorNode node : reachable) {
+            Long degree = indegree.get(node.getId());
+            if (degree < min) {
+              min = degree;
+              minNode = node;
+            }
+          }
+          // start from the node with minimal input edge again
+          q.add(minNode);
+        } else {
+          // all the remaining nodes should be reachable from sources
+          // start from sources again to find the next node that hasn't been visited
+          ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+              .filter(node -> !visited.contains(node))
+              .findAny().get();
+          q.add(nextNode);
+        }
+      }
+    }
+
+    return sortedNodes;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
new file mode 100644
index 0000000..08d94b4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.util.ConfigInheritence;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
+ * A ProcessorNode contains the input/output, and the configs for physical execution.
+ */
+public class ProcessorNode {
+  private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
+  private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
+
+  private final String id;
+  private final List<StreamEdge> inEdges = new ArrayList<>();
+  private final List<StreamEdge> outEdges = new ArrayList<>();
+  private final Config config;
+
+  ProcessorNode(String id, Config config) {
+    this.id = id;
+    this.config = config;
+  }
+
+  public  String getId() {
+    return id;
+  }
+
+  void addInEdge(StreamEdge in) {
+    inEdges.add(in);
+  }
+
+  void addOutEdge(StreamEdge out) {
+    outEdges.add(out);
+  }
+
+  List<StreamEdge> getInEdges() {
+    return inEdges;
+  }
+
+  List<StreamEdge> getOutEdges() {
+    return outEdges;
+  }
+
+  public Config generateConfig() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), id);
+
+    List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+    log.info("Processor {} has generated configs {}", id, configs);
+
+    String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
+    // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+    return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, new MapConfig(configs), configPrefix));
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
new file mode 100644
index 0000000..664a458
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
+
+
+/**
+ * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
+ * If it's a sink StreamEdge, the target ProcessorNode is empty.
+ * If it's a source StreamEdge, the source ProcessorNode is empty.
+ */
+public class StreamEdge {
+  private final StreamSpec streamSpec;
+  private final List<ProcessorNode> sourceNodes = new ArrayList<>();
+  private final List<ProcessorNode> targetNodes = new ArrayList<>();
+  private final Config config;
+
+  private String name = "";
+  private int partitions = -1;
+
+  StreamEdge(StreamSpec streamSpec, Config config) {
+    this.streamSpec = streamSpec;
+    this.config = config;
+    this.name = Util.getNameFromSystemStream(getSystemStream());
+  }
+
+  void addSourceNode(ProcessorNode sourceNode) {
+    sourceNodes.add(sourceNode);
+  }
+
+  void addTargetNode(ProcessorNode targetNode) {
+    targetNodes.add(targetNode);
+  }
+
+  StreamSpec getStreamSpec() {
+    return streamSpec;
+  }
+
+  SystemStream getSystemStream() {
+    return new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+  }
+
+  String getFormattedSystemStream() {
+    return Util.getNameFromSystemStream(getSystemStream());
+  }
+
+  List<ProcessorNode> getSourceNodes() {
+    return sourceNodes;
+  }
+
+  List<ProcessorNode> getTargetNodes() {
+    return targetNodes;
+  }
+
+  int getPartitions() {
+    return partitions;
+  }
+
+  void setPartitions(int partitions) {
+    this.partitions = partitions;
+  }
+
+  String getName() {
+    return name;
+  }
+
+  void setName(String name) {
+    this.name = name;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index e592e66..3288c5c 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -18,13 +18,24 @@
  */
 package org.apache.samza.system;
 
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.JobRunner;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.processorgraph.ExecutionPlanner;
+import org.apache.samza.processorgraph.ProcessorGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
  */
 public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
+  
+  private static final Logger log = LoggerFactory.getLogger(RemoteExecutionEnvironment.class);
 
   public RemoteExecutionEnvironment(Config config) {
     super(config);
@@ -33,9 +44,26 @@
   @Override public void run(StreamGraphBuilder app, Config config) {
     // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
     // TODO: actually instantiate the tasks and run the job, i.e.
-    // 1. create all input/output/intermediate topics
-    // 2. create the single job configuration
-    // 3. execute JobRunner to submit the single job for the whole graph
-  }
+    try {
+      // 1. build stream graph
+      StreamGraph streamGraph = new StreamGraphImpl(this);
+      app.init(streamGraph, config);
 
+      // 2. create the physical execution plan
+      ExecutionPlanner planner = new ExecutionPlanner(config);
+      ProcessorGraph processorGraph = planner.plan(streamGraph);
+
+      // 3. submit jobs for remote execution
+      processorGraph.getProcessors().forEach(processor -> {
+          Config processorConfig = processor.generateConfig();
+          String processorId = processor.getId();
+          log.info("Starting processor {} with config {}", processorId, config);
+
+          JobRunner runner = new JobRunner(processorConfig);
+          runner.run(true);
+        });
+    } catch (Exception e) {
+      throw new SamzaException("fail to run graph", e);
+    }
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index 71d60ef..b88e356 100644
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -36,7 +36,7 @@
 
   // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
   StreamGraph createGraph(StreamGraphBuilder app, Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
+    StreamGraphImpl graph = new StreamGraphImpl(this);
     app.init(graph, config);
     return graph;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index b007e3c..6032f4d 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -25,6 +25,7 @@
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.ExecutionEnvironment;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 
@@ -77,8 +78,11 @@
 
   @Override
   public final void init(Config config, TaskContext context) throws Exception {
+    // for now, we need to create the execution env again
+    // in the future if we decide to serialize the dag, this can be clean up
+    ExecutionEnvironment executionEnvironment = ExecutionEnvironment.fromConfig(config);
     // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
-    StreamGraphImpl streams = new StreamGraphImpl();
+    StreamGraphImpl streams = new StreamGraphImpl(executionEnvironment);
     this.graphBuilder.init(streams, config);
     // get the context manager of the {@link StreamGraph} and initialize the task-specific context
     this.contextManager = streams.getContextManager();
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
new file mode 100644
index 0000000..0bfa8c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.TaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+
+public class TaskFactories {
+  private static final Logger log = LoggerFactory.getLogger(TaskFactories.class);
+
+  public static Object fromTaskClassConfig(Config config) throws ClassNotFoundException {
+
+    String taskClassName;
+
+    // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory
+    if (isStreamOperatorTask(config)) {
+      taskClassName = StreamOperatorTask.class.getName();
+    } else {
+      taskClassName = new TaskConfig(config).getTaskClass().getOrElse(defaultValue(null));
+    }
+
+    if (taskClassName == null) {
+      throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
+    }
+    log.info("Got task class name: {}", taskClassName);
+
+    boolean isAsyncTaskClass = AsyncStreamTask.class.isAssignableFrom(Class.forName(taskClassName));
+    if (isAsyncTaskClass) {
+      return new AsyncStreamTaskFactory() {
+        @Override
+        public AsyncStreamTask createInstance() {
+          try {
+            return (AsyncStreamTask) Class.forName(taskClassName).newInstance();
+          } catch (Exception e) {
+            log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, e);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+    }
+
+    return new StreamTaskFactory() {
+      @Override
+      public StreamTask createInstance() {
+        try {
+          return taskClassName == StreamOperatorTask.class.getName() ? createStreamOperatorTask(config) :
+              (StreamTask) Class.forName(taskClassName).newInstance();
+        } catch (Exception e) {
+          log.error("Error loading StreamTask class: {}. error: {}", taskClassName, e);
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static StreamTask createStreamOperatorTask(Config config) throws Exception {
+    StreamGraphBuilder graphBuilder = (StreamGraphBuilder) Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance();
+    return new StreamOperatorTask(graphBuilder);
+  }
+
+  private static boolean isStreamOperatorTask(Config config) {
+    try {
+      if (config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != null && config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != "") {
+        return StreamGraphBuilder.class.isAssignableFrom(Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)));
+      }
+      return false;
+    } catch (Exception e) {
+      log.error("Failed to validate StreamGraphBuilder class from the config. {}={}",
+          StreamGraphBuilder.BUILDER_CLASS_CONFIG, config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+      return false;
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
new file mode 100644
index 0000000..e4fb32f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigInheritence {
+  private static final Logger log = LoggerFactory.getLogger(ConfigInheritence.class);
+  private static final boolean INHERIT_ROOT_CONFIGS = true;
+
+  public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
+    Config scopedConfig = fullConfig.subset(configPrefix);
+    log.debug("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
+    log.debug("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
+
+    Config[] configPrecedence;
+    if (INHERIT_ROOT_CONFIGS) {
+      configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
+    } else {
+      configPrecedence = new Config[] {generatedConfig, scopedConfig};
+    }
+
+    // Strip empty configs so they don't override the configs before them.
+    Map<String, String> mergedConfig = new HashMap<>();
+    for (Map<String, String> config : configPrecedence) {
+      for (Map.Entry<String, String> property : config.entrySet()) {
+        String value = property.getValue();
+        if (!(value == null || value.isEmpty())) {
+          mergedConfig.put(property.getKey(), property.getValue());
+        }
+      }
+    }
+    scopedConfig = new MapConfig(mergedConfig);
+    log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+    return scopedConfig;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
new file mode 100644
index 0000000..6c7fc2d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import scala.runtime.AbstractFunction0;
+
+/**
+ * Common utils methods that helps to convert or use Scala objects in Java code
+ */
+public class ScalaToJavaUtils {
+  /**
+   * Returns a default value object for scala option.getOrDefault() to use
+   * @param value default value
+   * @param <T> value type
+   * @return object containing default value
+   */
+  public static <T> AbstractFunction0<T> defaultValue(final T value) {
+    return new AbstractFunction0<T>() {
+      @Override
+      public T apply() {
+        return value;
+      }
+    };
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 9d6cbc2..5f22409 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -44,6 +44,7 @@
   val SAMZA_FWK_VERSION = "samza.fwk.version"
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
   val JOB_DEFAULT_SYSTEM = "job.default.system"
+  val JOB_DEFAULT_PARTITIONS = "job.default.partitions"
   val JOB_CONTAINER_COUNT = "job.container.count"
   val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -79,6 +80,10 @@
   // Processor Config Constants
   val PROCESSOR_ID = "processor.id"
 
+  val EXECUTION_ENV = "job.execution.env"
+
+  val STREAM_GRAPH_BUILDER = "job.stream.graph.builder"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89522dc..5476fb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -25,6 +25,8 @@
 import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
 import java.lang.Thread.UncaughtExceptionHandler
 import java.net.{URL, UnknownHostException}
+import javax.servlet.SingleThreadModel
+
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
@@ -64,13 +66,7 @@
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.AsyncRunLoop
-import org.apache.samza.task.AsyncStreamTask
-import org.apache.samza.task.AsyncStreamTaskAdapter
-import org.apache.samza.task.AsyncStreamTaskFactory
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.StreamTaskFactory
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.task._
 import org.apache.samza.util.HighResolutionClock
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
@@ -432,28 +428,6 @@
     val singleThreadMode = config.getSingleThreadMode
     info("Got single thread mode: " + singleThreadMode)
 
-    val taskClassName = config.getTaskClass.orNull
-    info("Got task class name: %s" format taskClassName)
-
-    if (taskClassName == null && taskFactory == null) {
-      throw new SamzaException("Either the task class name or the task factory instance is required.")
-    }
-
-    val isAsyncTask: Boolean =
-      if (taskFactory != null) {
-        taskFactory.isInstanceOf[AsyncStreamTaskFactory]
-      } else {
-        classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
-      }
-
-    if (isAsyncTask) {
-      info("Got an AsyncStreamTask implementation.")
-    }
-
-    if(singleThreadMode && isAsyncTask) {
-      throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
-    }
-
     val threadPoolSize = config.getThreadPoolSize
     info("Got thread pool size: " + threadPoolSize)
 
@@ -462,6 +436,39 @@
     else
       null
 
+    def maybeAsyncStreamTaskFactory : Object = {
+
+      val taskClassFactory = taskFactory match {
+        case null => TaskFactories.fromTaskClassConfig(config)
+        case _ => taskFactory
+      }
+
+      if (taskClassFactory == null) {
+        throw new SamzaException("Either the task class name or the task factory instance is required.")
+      }
+
+      val isAsyncTaskClass: Boolean = taskClassFactory.isInstanceOf[AsyncStreamTaskFactory]
+
+      if (isAsyncTaskClass) {
+        info("Got an AsyncStreamTask implementation.")
+      }
+
+      if (singleThreadMode && isAsyncTaskClass) {
+        throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
+      }
+
+      if (!singleThreadMode && !isAsyncTaskClass) {
+        info("Converting StreamTask to AsyncStreamTaskAdapter when running StreamTask w/ multiple threads")
+        new AsyncStreamTaskFactory {
+          override def createInstance() = new AsyncStreamTaskAdapter(taskClassFactory.asInstanceOf[StreamTaskFactory].createInstance(), taskThreadPool)
+        }
+      } else {
+        taskClassFactory
+      }
+    }
+
+    val finalTaskFactory = maybeAsyncStreamTaskFactory
+
     // Wire up all task-instance-level (unshared) objects.
     val taskNames = containerModel
       .getTasks
@@ -483,24 +490,14 @@
 
       val taskName = taskModel.getTaskName
 
-      val taskObj = if (taskFactory != null) {
-        debug("Using task factory to create task instance")
-        taskFactory match {
+      debug("Using task factory to create task instance")
+      val task =
+        finalTaskFactory match {
           case tf: AsyncStreamTaskFactory => tf.createInstance()
           case tf: StreamTaskFactory => tf.createInstance()
           case _ =>
             throw new SamzaException("taskFactory must be an instance of StreamTaskFactory or AsyncStreamTaskFactory")
         }
-      } else {
-        debug("Using task class name: %s to create instance" format taskClassName)
-        Class.forName(taskClassName).newInstance
-      }
-
-      val task = if (!singleThreadMode && !isAsyncTask)
-        // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool
-        new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool)
-      else
-        taskObj
 
       val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName)
 
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 022b480..1d81b33 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,17 +20,22 @@
 package org.apache.samza.job
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigRewriter
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.messages.Delete
+import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.operators.StreamGraphBuilder
+import org.apache.samza.system.ExecutionEnvironment
 import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
+
 import scala.collection.JavaConversions._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 
 
 object JobRunner extends Logging {
@@ -63,7 +68,16 @@
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    new JobRunner(rewriteConfig(config)).run()
+
+    // start execution env if it's defined
+    val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "")
+    if (!envClass.isEmpty) {
+      val env: ExecutionEnvironment = ExecutionEnvironment.fromConfig(config)
+      val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
+      env.run(graphBuilder, rewriteConfig(config))
+    } else {
+      new JobRunner(rewriteConfig(config)).run()
+    }
   }
 }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 9019d02..97bd22a 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -23,6 +23,7 @@
 import java.io._
 import java.lang.management.ManagementFactory
 import java.util.zip.CRC32
+import org.apache.samza.config.ConfigRewriter
 import org.apache.samza.{SamzaException, Partition}
 import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
 import java.util.Random
@@ -395,4 +396,28 @@
    * @return Scala clock function
    */
   implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
+
+  /**
+   * Re-writes configuration using a ConfigRewriter, if one is defined. If
+   * there is no ConfigRewriter defined for the job, then this method is a
+   * no-op.
+   *
+   * @param config The config to re-write
+   * @return re-written config
+   */
+  def rewriteConfig(config: Config): Config = {
+    def rewrite(c: Config, rewriterName: String): Config = {
+      val klass = config
+              .getConfigRewriterClass(rewriterName)
+              .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+      case _ => config
+    }
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
index 8ecd44f..f817379 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@ -21,6 +21,7 @@
 import java.lang.reflect.Field;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.impl.OperatorGraph;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.StreamOperatorTask;
@@ -47,7 +48,7 @@
 
   @Test
   public void testUserTask() throws Exception {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = spy(new MapConfig());
     TaskContext mockContext = mock(TaskContext.class);
     when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
     TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
@@ -64,7 +65,7 @@
 
   @Test
   public void testSplitTask() throws Exception {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = spy(new MapConfig());
     TaskContext mockContext = mock(TaskContext.class);
     when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
     TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
@@ -81,7 +82,7 @@
 
   @Test
   public void testJoinTask() throws Exception {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = spy(new MapConfig());
     TaskContext mockContext = mock(TaskContext.class);
     when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
     TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
new file mode 100644
index 0000000..68a2142
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.system.AbstractExecutionEnvironment;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionPlanner {
+
+  private Config config;
+
+  private ExecutionEnvironment env;
+
+  private static final String DEFAULT_SYSTEM = "test-system";
+  private static final int DEFAULT_PARTITIONS = 10;
+
+  private StreamSpec input1;
+  private StreamSpec input2;
+  private StreamSpec input3;
+  private StreamSpec output1;
+  private StreamSpec output2;
+
+  private Map<String, SystemAdmin> systemAdmins;
+
+  private JoinFunction createJoin() {
+    return new JoinFunction() {
+      @Override
+      public Object apply(Object message, Object otherMessage) {
+        return null;
+      }
+
+      @Override
+      public Object getFirstKey(Object message) {
+        return null;
+      }
+
+      @Override
+      public Object getSecondKey(Object message) {
+        return null;
+      }
+    };
+  }
+
+  private SinkFunction createSink() {
+    return new SinkFunction() {
+      @Override
+      public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
+      }
+    };
+  }
+
+  private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
+
+    return new SystemAdmin() {
+      @Override
+      public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+        return null;
+      }
+
+      @Override
+      public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+        Map<String, SystemStreamMetadata> map = new HashMap<>();
+        for (String stream : streamNames) {
+          Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> m = new HashMap<>();
+          for (int i = 0; i < streamToPartitions.get(stream); i++) {
+            m.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", ""));
+          }
+          map.put(stream, new SystemStreamMetadata(stream, m));
+        }
+        return map;
+      }
+
+      @Override
+      public void createChangelogStream(String streamName, int numOfPartitions) {
+
+      }
+
+      @Override
+      public void validateChangelogStream(String streamName, int numOfPartitions) {
+
+      }
+
+      @Override
+      public void createCoordinatorStream(String streamName) {
+
+      }
+
+      @Override
+      public Integer offsetComparator(String offset1, String offset2) {
+        return null;
+      }
+    };
+  }
+
+  private StreamGraph createSimpleGraph() {
+    /**
+     * a simple graph of partitionBy and map
+     *
+     * input1 -> partitionBy -> map -> output1
+     *
+     */
+    StreamGraph streamGraph = new StreamGraphImpl(env);
+    streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+    return streamGraph;
+  }
+
+  private StreamGraph createStreamGraphWithJoin() {
+
+    /** the graph looks like the following
+     *
+     *                        input1 -> map -> join -> output1
+     *                                           |
+     *          input2 -> partitionBy -> filter -|
+     *                                           |
+     * input3 -> filter -> partitionBy -> map -> join -> output2
+     *
+     */
+
+    StreamGraph streamGraph = new StreamGraphImpl(env);
+    MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
+    MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+
+    m1.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output1, null, null));
+    m3.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output2, null, null));
+
+    return streamGraph;
+  }
+
+  @Before
+  public void setup() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM);
+
+    config = new MapConfig(configMap);
+
+    env = new AbstractExecutionEnvironment(config) {
+      @Override
+      public void run(StreamGraphBuilder graphBuilder, Config config) {
+      }
+    };
+
+    input1 = new StreamSpec("input1", "input1", "system1");
+    input2 = new StreamSpec("input2", "input2", "system2");
+    input3 = new StreamSpec("input3", "input3", "system2");
+
+    output1 = new StreamSpec("output1", "output1", "system1");
+    output2 = new StreamSpec("output2", "output2", "system2");
+
+    // set up external partition count
+    Map<String, Integer> system1Map = new HashMap<>();
+    system1Map.put("input1", 64);
+    system1Map.put("output1", 8);
+    Map<String, Integer> system2Map = new HashMap<>();
+    system2Map.put("input2", 16);
+    system2Map.put("input3", 32);
+    system2Map.put("output2", 16);
+
+    SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
+    SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
+    systemAdmins = new HashMap<>();
+    systemAdmins.put("system1", systemAdmin1);
+    systemAdmins.put("system2", systemAdmin2);
+  }
+
+  @Test
+  public void testCreateProcessorGraph() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createStreamGraphWithJoin();
+
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    assertTrue(processorGraph.getSources().size() == 3);
+    assertTrue(processorGraph.getSinks().size() == 2);
+    assertTrue(processorGraph.getInternalStreams().size() == 2); // two streams generated by partitionBy
+  }
+
+  @Test
+  public void testFetchExistingStreamPartitions() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createStreamGraphWithJoin();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+    ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+    assertTrue(processorGraph.getEdge(input1).getPartitions() == 64);
+    assertTrue(processorGraph.getEdge(input2).getPartitions() == 16);
+    assertTrue(processorGraph.getEdge(input3).getPartitions() == 32);
+    assertTrue(processorGraph.getEdge(output1).getPartitions() == 8);
+    assertTrue(processorGraph.getEdge(output2).getPartitions() == 16);
+
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == -1);
+      });
+  }
+
+  @Test
+  public void testCalculateJoinInputPartitions() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createStreamGraphWithJoin();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+    ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+
+    // the partitions should be the same as input1
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == 64);
+      });
+  }
+
+  @Test
+  public void testDefaultPartitions() {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(JobConfig.JOB_DEFAULT_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg);
+    StreamGraph streamGraph = createSimpleGraph();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+    // the partitions should be the same as input1
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == DEFAULT_PARTITIONS);
+      });
+  }
+
+  @Test
+  public void testCalculateIntStreamPartitions() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createSimpleGraph();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+    // the partitions should be the same as input1
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == 64); // max of input1 and output1
+      });
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
new file mode 100644
index 0000000..7aa9f41
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestProcessorGraph {
+
+  ProcessorGraph graph1;
+  ProcessorGraph graph2;
+  int streamSeq = 0;
+
+  private StreamSpec genStream() {
+    ++streamSeq;
+
+    return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+  }
+
+  @Before
+  public void setup() {
+    /**
+     * graph1 is the example graph from wikipedia
+     *
+     * 5   7   3
+     * | / | / |
+     * v   v   |
+     * 11  8   |
+     * | \X   /
+     * v v \v
+     * 2 9 10
+     */
+    // init graph1
+    graph1 = new ProcessorGraph(null);
+    graph1.addSource(genStream(), "5");
+    graph1.addSource(genStream(), "7");
+    graph1.addSource(genStream(), "3");
+    graph1.addEdge(genStream(), "5", "11");
+    graph1.addEdge(genStream(), "7", "11");
+    graph1.addEdge(genStream(), "7", "8");
+    graph1.addEdge(genStream(), "3", "8");
+    graph1.addEdge(genStream(), "11", "2");
+    graph1.addEdge(genStream(), "11", "9");
+    graph1.addEdge(genStream(), "8", "9");
+    graph1.addEdge(genStream(), "11", "10");
+    graph1.addSink(genStream(), "2");
+    graph1.addSink(genStream(), "9");
+    graph1.addSink(genStream(), "10");
+
+    /**
+     * graph2 is a graph with a loop
+     * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+     *      |<---6 <--|    <>
+     */
+    graph2 = new ProcessorGraph(null);
+    graph2.addSource(genStream(), "1");
+    graph2.addEdge(genStream(), "1", "2");
+    graph2.addEdge(genStream(), "2", "3");
+    graph2.addEdge(genStream(), "3", "4");
+    graph2.addEdge(genStream(), "4", "5");
+    graph2.addEdge(genStream(), "4", "6");
+    graph2.addEdge(genStream(), "6", "2");
+    graph2.addEdge(genStream(), "5", "5");
+    graph2.addEdge(genStream(), "5", "7");
+    graph2.addSink(genStream(), "7");
+  }
+
+  @Test
+  public void testAddSource() {
+    ProcessorGraph graph = new ProcessorGraph(null);
+
+    /**
+     * s1 -> 1
+     * s2 ->|
+     *
+     * s3 -> 2
+     *   |-> 3
+     */
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    graph.addSource(s1, "1");
+    graph.addSource(s2, "1");
+    graph.addSource(s3, "2");
+    graph.addSource(s3, "3");
+
+    assertTrue(graph.getSources().size() == 3);
+
+    assertTrue(graph.getNode("1").getInEdges().size() == 2);
+    assertTrue(graph.getNode("2").getInEdges().size() == 1);
+    assertTrue(graph.getNode("3").getInEdges().size() == 1);
+
+    assertTrue(graph.getEdge(s1).getSourceNodes().size() == 0);
+    assertTrue(graph.getEdge(s1).getTargetNodes().size() == 1);
+    assertTrue(graph.getEdge(s2).getSourceNodes().size() == 0);
+    assertTrue(graph.getEdge(s2).getTargetNodes().size() == 1);
+    assertTrue(graph.getEdge(s3).getSourceNodes().size() == 0);
+    assertTrue(graph.getEdge(s3).getTargetNodes().size() == 2);
+  }
+
+  @Test
+  public void testAddSink() {
+    /**
+     * 1 -> s1
+     * 2 -> s2
+     * 2 -> s3
+     */
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    ProcessorGraph graph = new ProcessorGraph(null);
+    graph.addSink(s1, "1");
+    graph.addSink(s2, "2");
+    graph.addSink(s3, "2");
+
+    assertTrue(graph.getSinks().size() == 3);
+    assertTrue(graph.getNode("1").getOutEdges().size() == 1);
+    assertTrue(graph.getNode("2").getOutEdges().size() == 2);
+
+    assertTrue(graph.getEdge(s1).getSourceNodes().size() == 1);
+    assertTrue(graph.getEdge(s1).getTargetNodes().size() == 0);
+    assertTrue(graph.getEdge(s2).getSourceNodes().size() == 1);
+    assertTrue(graph.getEdge(s2).getTargetNodes().size() == 0);
+    assertTrue(graph.getEdge(s3).getSourceNodes().size() == 1);
+    assertTrue(graph.getEdge(s3).getTargetNodes().size() == 0);
+  }
+
+  @Test
+  public void testReachable() {
+    Set<ProcessorNode> reachable1 = graph1.findReachable();
+    assertTrue(reachable1.size() == 8);
+
+    Set<ProcessorNode> reachable2 = graph2.findReachable();
+    assertTrue(reachable2.size() == 7);
+  }
+
+  @Test
+  public void testTopologicalSort() {
+
+    // test graph1
+    List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
+    Map<String, Integer> idxMap1 = new HashMap<>();
+    for (int i = 0; i < sortedNodes1.size(); i++) {
+      idxMap1.put(sortedNodes1.get(i).getId(), i);
+    }
+
+    assertTrue(idxMap1.size() == 8);
+    assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+    assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+    assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+    // test graph2
+    List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
+    Map<String, Integer> idxMap2 = new HashMap<>();
+    for (int i = 0; i < sortedNodes2.size(); i++) {
+      idxMap2.put(sortedNodes2.get(i).getId(), i);
+    }
+
+    assertTrue(idxMap2.size() == 7);
+    assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+    assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
new file mode 100644
index 0000000..29741a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.testUtils.TestAsyncStreamTask;
+import org.apache.samza.testUtils.TestStreamTask;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test methods to create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on task class configuration
+ */
+public class TestTaskFactories {
+
+  @Test
+  public void testStreamTaskClass() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "no.such.class");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ClassNotFoundException cfe) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStreamOperatorTaskClass() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ConfigException ce) {
+      // expected
+    }
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "no.such.class");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ConfigException ce) {
+      // expected
+    }
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ empty class name for StreamGraphBuilder");
+    } catch (ConfigException ce) {
+      // expected
+    }
+
+    config = new MapConfig(new HashMap<String, String>());
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail(String.format("Should have failed w/ non-existing entry for %s", StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+    } catch (ConfigException ce) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStreamOperatorTaskClassWithTaskClass() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "no.such.class");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+  }
+
+  @Test
+  public void testStreamTaskClassWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no class not found");
+    } catch (ClassNotFoundException cne) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testAsyncStreamTask() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "no.such.class");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ClassNotFoundException cfe) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
new file mode 100644
index 0000000..2fe6946
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+/**
+ * Test class. Invalid class to implement {@link org.apache.samza.operators.StreamGraphBuilder}
+ */
+public class InvalidStreamGraphBuilder {
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
new file mode 100644
index 0000000..81f3fd4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link AsyncStreamTask}
+ */
+public class TestAsyncStreamTask implements AsyncStreamTask {
+  @Override
+  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) {
+
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
new file mode 100644
index 0000000..10eb02f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+
+/**
+ * Test implementation class for {@link StreamGraphBuilder}
+ */
+public class TestStreamGraphBuilder implements StreamGraphBuilder {
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
new file mode 100644
index 0000000..ce0980a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link StreamTask}
+ */
+public class TestStreamTask implements StreamTask {
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index bfda464..9a30dc6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -56,7 +56,7 @@
 
   @Before
   public void testSetup() {
-    testZkConnectionString = "localhost:" + zkServer.getPort();
+    testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
     try {
       testZkUtils = getZkUtilsWithNewClient();
     } catch (Exception e) {
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 58c3ed6..09dc5c7 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -55,7 +55,7 @@
   public void testSetup() {
     try {
       zkClient = new ZkClient(
-          new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
+          new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
           CONNECTION_TIMEOUT_MS);
     } catch (Exception e) {
       Assert.fail("Client connection setup failed. Aborting tests..");
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 5d82b92..b803dfe 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -89,8 +89,6 @@
     "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
     "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
     "systems.kafka.samza.msg.serde" -> "string",
-    "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181",
-    "systems.kafka.producer.bootstrap.servers" -> "localhost:9092",
     // Since using state, need a checkpoint manager
     "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
     "task.checkpoint.system" -> "kafka",
@@ -122,6 +120,7 @@
     val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
     brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
 
+    // setup the zookeeper and bootstrap servers for local kafka cluster
     jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
       "systems.kafka.producer.bootstrap.servers" -> brokers)