Merge branch 'master' into sanjeevk/encryption
diff --git a/.gitignore b/.gitignore
index b65721f..927928b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -93,6 +93,9 @@
 ## Maven generated files
 .classpath.txt
 
+## Generated by CI jobs run locally
+heron_build.txt
+
 ## File-based project format
 *.ipr
 *.iws
diff --git a/.travis.yml b/.travis.yml
index a4ef75a..ee0e203 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,6 @@
   apt:
     sources:
       - ubuntu-toolchain-r-test
-      - george-edison55-precise-backports # cmake 3.2.3 / doxygen 1.8.3
 
     packages:
       - gcc-4.8
@@ -23,8 +22,6 @@
       - pkg-config
       - zip
       - zlib1g-dev
-      - cmake
-      - cmake-data
 
 env:
   - CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8
diff --git a/WORKSPACE b/WORKSPACE
index b2b4485..cb3f559 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -285,6 +285,11 @@
 )
 
 maven_jar(
+  name = "org_objectweb_asm",
+  artifact = "org.ow2.asm:asm:5.0.4",
+)
+
+maven_jar(
   name = "org_apache_mesos_mesos",
   artifact = "org.apache.mesos:mesos:0.22.0",
 )
diff --git a/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml b/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml
index e8c952f..9d0ef5c 100644
--- a/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml
+++ b/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml
@@ -1,20 +1,54 @@
 ##
 ## Heron API server deployment
 ##
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  labels:
+    k8s-app: heron-apiserver
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: heron-apiserver
+  labels:
+    app: heron-apiserver
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: cluster-admin
+subjects:
+- kind: ServiceAccount
+  name: heron-apiserver
+  namespace: default
+
+---
+
 apiVersion: apps/v1beta1
 kind: Deployment
 metadata:
   name: heron-apiserver
+  labels:
+      app: heron-apiserver
+    namespace: default
 spec:
+  selector:
+    matchLabels:
+      app: heron-apiserver
   replicas: 1
   template:
     metadata:
       labels:
         app: heron-apiserver
     spec:
+      serviceAccountName: heron-apiserver
       containers:
         - name: heron-apiserver
-          image: heron/heron:0.16.2
+          image: heron/heron:latest
           command: ["sh", "-c"]
           args:
             - >-
@@ -23,7 +57,7 @@
               --cluster kubernetes
               -D heron.statemgr.connection.string=zookeeper:2181
               -D heron.kubernetes.scheduler.uri=http://localhost:8001
-              -D heron.executor.docker.image=heron/heron:0.16.2
+              -D heron.executor.docker.image=heron/heron:latest
               -D heron.class.uploader=com.twitter.heron.uploader.dlog.DLUploader
               -D heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/distributedlog
         - name: kubectl-proxy
diff --git a/deploy/kubernetes/gcp/gcs-apiserver.yaml b/deploy/kubernetes/gcp/gcs-apiserver.yaml
index 80091d3..1ed21ad 100644
--- a/deploy/kubernetes/gcp/gcs-apiserver.yaml
+++ b/deploy/kubernetes/gcp/gcs-apiserver.yaml
@@ -1,17 +1,51 @@
 ##
 ## Heron API server deployment
 ##
-apiVersion: apps/v1beta1
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  labels:
+    k8s-app: heron-apiserver
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: heron-apiserver
+  labels:
+    app: heron-apiserver
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: cluster-admin
+subjects:
+- kind: ServiceAccount
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: extensions/v1beta1
 kind: Deployment
 metadata:
   name: heron-apiserver
+  labels:
+    app: heron-apiserver
+  namespace: default
 spec:
+  selector:
+    matchLabels:
+      app: heron-apiserver
   replicas: 1
   template:
     metadata:
       labels:
         app: heron-apiserver
     spec:
+      serviceAccountName: heron-apiserver
       volumes:
         - name: google-cloud-key
           secret:
diff --git a/deploy/kubernetes/general/README.md b/deploy/kubernetes/general/README.md
index 2558641..2132ee5 100644
--- a/deploy/kubernetes/general/README.md
+++ b/deploy/kubernetes/general/README.md
@@ -69,15 +69,20 @@
 }
 ```
 
-3. Submit an example topology:
+3. Set service_url:
 ```shell
-$ heron submit kubernetes \
---service-url=http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
-~/.heron/examples/heron-api-examples.jar \
+$ heron config kubernetes \
+set service_url http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
 com.twitter.heron.examples.api.AckingTopology acking
 ```
 
-4. View heron ui:
+4. Submit an example topology:
+```shell
+$ heron submit kubernetes ~/.heron/examples/heron-api-examples.jar \
+com.twitter.heron.examples.api.AckingTopology acking
+```
+
+5. View heron ui:
 ```
 http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-ui:8889
 ```
diff --git a/deploy/kubernetes/general/apiserver.yaml b/deploy/kubernetes/general/apiserver.yaml
index e8c952f..a368b92 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -1,20 +1,54 @@
 ##
 ## Heron API server deployment
 ##
-apiVersion: apps/v1beta1
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  labels:
+    k8s-app: heron-apiserver
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: heron-apiserver
+  labels:
+    app: heron-apiserver
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: cluster-admin
+subjects:
+- kind: ServiceAccount
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: extensions/v1beta1
 kind: Deployment
 metadata:
   name: heron-apiserver
+  labels:
+    app: heron-apiserver
+  namespace: default
 spec:
+  selector:
+    matchLabels:
+      app: heron-apiserver
   replicas: 1
   template:
     metadata:
       labels:
         app: heron-apiserver
     spec:
+      serviceAccountName: heron-apiserver
       containers:
         - name: heron-apiserver
-          image: heron/heron:0.16.2
+          image: heron/heron:latest
           command: ["sh", "-c"]
           args:
             - >-
@@ -23,7 +57,7 @@
               --cluster kubernetes
               -D heron.statemgr.connection.string=zookeeper:2181
               -D heron.kubernetes.scheduler.uri=http://localhost:8001
-              -D heron.executor.docker.image=heron/heron:0.16.2
+              -D heron.executor.docker.image=heron/heron:latest
               -D heron.class.uploader=com.twitter.heron.uploader.dlog.DLUploader
               -D heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/distributedlog
         - name: kubectl-proxy
diff --git a/deploy/kubernetes/minikube/README.md b/deploy/kubernetes/minikube/README.md
index ba5ec99..31410f6 100644
--- a/deploy/kubernetes/minikube/README.md
+++ b/deploy/kubernetes/minikube/README.md
@@ -68,15 +68,19 @@
 }
 ```
 
-3. Submit an example topology:
+3. Set service_url:
 ```shell
-$ heron submit kubernetes \
---service-url=http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
-~/.heron/examples/heron-api-examples.jar \
+$ heron config kubernetes \
+set service_url http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
+```
+
+4. Submit an example topology:
+```shell
+$ heron submit kubernetes ~/.heron/examples/heron-api-examples.jar \
 com.twitter.heron.examples.api.AckingTopology acking
 ```
 
-4. View heron ui:
+5. View heron ui:
 ```
 http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-ui:8889
 ```
diff --git a/deploy/kubernetes/minikube/apiserver.yaml b/deploy/kubernetes/minikube/apiserver.yaml
index d921af3..e3efd69 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -1,17 +1,51 @@
 ##
 ## Heron API server deployment
 ##
-apiVersion: apps/v1beta1
+
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  labels:
+    k8s-app: heron-apiserver
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: heron-apiserver
+  labels:
+    app: heron-apiserver
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: cluster-admin
+subjects:
+- kind: ServiceAccount
+  name: heron-apiserver
+  namespace: default
+
+---
+
+apiVersion: extensions/v1beta1
 kind: Deployment
 metadata:
   name: heron-apiserver
+  labels:
+    app: heron-apiserver
 spec:
+  selector:
+    matchLabels:
+      app: heron-apiserver
   replicas: 1
   template:
     metadata:
       labels:
         app: heron-apiserver
     spec:
+      serviceAccountName: heron-apiserver
       containers:
         - name: heron-apiserver
           image: heron/heron:latest
diff --git a/examples/src/java/BUILD b/examples/src/java/BUILD
index d8eaeff..3994603 100644
--- a/examples/src/java/BUILD
+++ b/examples/src/java/BUILD
@@ -4,7 +4,7 @@
     name='api-examples-unshaded',
     srcs = glob(["com/twitter/heron/examples/api/**/*.java"]),
     deps = [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/simulator/src/java:simulator-java"
     ],
@@ -24,7 +24,9 @@
     deps = [
         "//heron/api/src/java:api-java",
         "//heron/common/src/java:basics-java",
-        "//heron/simulator/src/java:simulator-java"
+        "//heron/simulator/src/java:simulator-java",
+        "//third_party/java:kryo",
+        "@apache_pulsar_client//jar",
     ],
     create_executable = 0,
 )
diff --git a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
index 75338db..33bd0a0 100644
--- a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
@@ -67,6 +67,11 @@
     conf.setContainerRamRequested(ByteAmount.fromGigabytes(2));
     conf.setContainerCpuRequested(2);
 
+    // Specify the size of ram padding to per container.
+    // Notice, this config will be considered as a hint,
+    // and it's up to the packing algorithm to determine whether to apply this hint
+    conf.setContainerRamPadding(ByteAmount.fromGigabytes(2));
+
     if (args != null && args.length > 0) {
       conf.setNumStmgrs(2);
       HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/FilesystemSinkTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/FilesystemSinkTopology.java
new file mode 100644
index 0000000..635e5b4
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/FilesystemSinkTopology.java
@@ -0,0 +1,141 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Sink;
+
+/**
+ * This topology demonstrates how sinks work in the Heron Streamlet API for Java.
+ * In this case, the sink is a temporary file. Each value that enters the graph
+ * from the source streamlet (an indefinite series of randomly generated
+ * integers) is written to that temporary file.
+ */
+public final class FilesystemSinkTopology {
+  private FilesystemSinkTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(FilesystemSinkTopology.class.getName());
+
+  /**
+   * Implements the Sink interface, which defines what happens when the toSink
+   * method is invoked in a processing graph.
+   */
+  private static class FilesystemSink<T> implements Sink<T> {
+    private static final long serialVersionUID = -96514621878356224L;
+    private Path tempFilePath;
+    private File tempFile;
+
+    FilesystemSink(File f) {
+      this.tempFile = f;
+    }
+
+    /**
+     * The setup function is called before the sink is used. Any complex
+     * instantiation logic for the sink should go here.
+     */
+    public void setup(Context context) {
+      this.tempFilePath = Paths.get(tempFile.toURI());
+    }
+
+    /**
+     * The put function defines how each incoming streamlet element is
+     * actually processed. In this case, each incoming element is converted
+     * to a byte array and written to the temporary file (successful writes
+     * are also logged). Any exceptions are converted to RuntimeExceptions,
+     * which will effectively kill the topology.
+     */
+    public void put(T element) {
+      byte[] bytes = String.format("%s\n", element.toString()).getBytes();
+
+      try {
+        Files.write(tempFilePath, bytes, StandardOpenOption.APPEND);
+        LOG.info(
+            String.format("Wrote %s to %s",
+                new String(bytes),
+                tempFilePath.toAbsolutePath()
+            )
+        );
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
+     * Any cleanup logic for the sink can be applied here.
+     */
+    public void cleanup() {
+    }
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    // Creates a temporary file to write output into.
+    File file = File.createTempFile("filesystem-sink-example", ".tmp");
+
+    LOG.info(
+        String.format("Ready to write to file %s",
+            file.getAbsolutePath()
+        )
+    );
+
+    processingGraphBuilder
+        .newSource(() -> {
+          // This applies a "brake" that makes the processing graph write
+          // to the temporary file at a reasonable, readable pace.
+          StreamletUtils.sleep(500);
+          return ThreadLocalRandom.current().nextInt(100);
+        })
+        .setName("incoming-integers")
+        // Here, the FilesystemSink implementation of the Sink
+        // interface is passed to the toSink function.
+        .toSink(new FilesystemSink<>(file));
+
+    // The topology's parallelism (the number of containers across which the topology's
+    // processing instance will be split) can be defined via the second command-line
+    // argument (or else the default of 2 will be used).
+    int topologyParallelism = StreamletUtils.getParallelism(args, 2);
+
+    Config config = new Config.Builder()
+        .setNumContainers(topologyParallelism)
+        .build();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/FormattedOutputTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/FormattedOutputTopology.java
new file mode 100644
index 0000000..b36e18b
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/FormattedOutputTopology.java
@@ -0,0 +1,113 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+
+/**
+ * This topology demonstrates the use of consume operations in the Heron
+ * Streamlet API for Java. A consume operation terminates a processing
+ * graph, like a simpler version of a sink (which requires implementing
+ * the Sink interface). Here, the consume operation is used to produce
+ * custom-formatted logging output for a processing graph in which random
+ * sensor readings are fed into the graph every two seconds (a simple
+ * filter is also applied to this source streamlet prior to logging).
+ */
+public final class FormattedOutputTopology {
+  private FormattedOutputTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(FormattedOutputTopology.class.getName());
+
+  /**
+   * A list of devices emitting sensor readings ("device1" through "device100").
+   */
+  private static final List<String> DEVICES = IntStream.range(1, 100)
+      .mapToObj(i -> String.format("device%d", i))
+      .collect(Collectors.toList());
+
+  /**
+   * Sensor readings consist of a device ID, a temperature reading, and
+   * a humidity reading. The temperature and humidity readings are
+   * randomized within a range.
+   */
+  private static class SensorReading implements Serializable {
+    private static final long serialVersionUID = 3418308641606699744L;
+    private String deviceId;
+    private double temperature;
+    private double humidity;
+
+    SensorReading() {
+      // Readings are produced only every two seconds
+      StreamletUtils.sleep(2000);
+      this.deviceId = StreamletUtils.randomFromList(DEVICES);
+      // Each temperature reading is a double between 70 and 100
+      this.temperature = 70 + 30 * new Random().nextDouble();
+      // Each humidity reading is a percentage between 80 and 100
+      this.humidity = (80 + 20 * new Random().nextDouble()) / 100;
+    }
+
+    String getDeviceId() {
+      return deviceId;
+    }
+
+    double getTemperature() {
+      return temperature;
+    }
+
+    double getHumidity() {
+      return humidity;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    processingGraphBuilder
+        // The source streamlet is an indefinite series of sensor readings
+        // emitted every two seconds
+        .newSource(SensorReading::new)
+        // A simple filter that excludes a percentage of the sensor readings
+        .filter(reading -> reading.getHumidity() < .9 && reading.getTemperature() < 90)
+        // In the consumer operation, each reading is converted to a formatted
+        // string and logged
+        .consume(reading -> LOG.info(
+            String.format("Reading from device %s: (temp: %f, humidity: %f)",
+                reading.getDeviceId(),
+                reading.getTemperature(),
+                reading.getHumidity())
+        ));
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    Config config = Config.defaultConfig();
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/ImpressionsAndClicksTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/ImpressionsAndClicksTopology.java
new file mode 100644
index 0000000..558840b
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/ImpressionsAndClicksTopology.java
@@ -0,0 +1,197 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.JoinType;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+import com.twitter.heron.streamlet.WindowConfig;
+
+/**
+ * This topology demonstrates the use of join operations in the Heron
+ * Streamlet API for Java. Two independent streamlets, one consisting
+ * of ad impressions, the other of ad clicks, is joined together. A join
+ * function then checks if the userId matches on the impression and click.
+ * Finally, a reduce function counts the number of impression/click matches
+ * over the specified time window.
+ */
+public final class ImpressionsAndClicksTopology {
+  private ImpressionsAndClicksTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(ImpressionsAndClicksTopology.class.getName());
+
+  /**
+   * A list of company IDs to be used to generate random clicks and impressions.
+   */
+  private static final List<String> ADS = Arrays.asList(
+      "acme",
+      "blockchain-inc",
+      "omnicorp"
+  );
+
+  /**
+   * A list of 25 active users ("user1" through "user25").
+   */
+  private static final List<String> USERS = IntStream.range(1, 25)
+      .mapToObj(i -> String.format("user%d", i))
+      .collect(Collectors.toList());
+
+  /**
+   * A POJO for incoming ad impressions (generated every 50 milliseconds).
+   */
+  private static class AdImpression implements Serializable {
+    private static final long serialVersionUID = 3283110635310800177L;
+
+    private String adId;
+    private String userId;
+    private String impressionId;
+
+    AdImpression() {
+      this.adId = StreamletUtils.randomFromList(ADS);
+      this.userId = StreamletUtils.randomFromList(USERS);
+      this.impressionId = UUID.randomUUID().toString();
+      LOG.info(String.format("Emitting impression: %s", this));
+    }
+
+    String getAdId() {
+      return adId;
+    }
+
+    String getUserId() {
+      return userId;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("(adId; %s, impressionId: %s)",
+          adId,
+          impressionId
+      );
+    }
+  }
+
+  /**
+   * A POJO for incoming ad clicks (generated every 50 milliseconds).
+   */
+  private static class AdClick implements Serializable {
+    private static final long serialVersionUID = 7202766159176178988L;
+    private String adId;
+    private String userId;
+    private String clickId;
+
+    AdClick() {
+      this.adId = StreamletUtils.randomFromList(ADS);
+      this.userId = StreamletUtils.randomFromList(USERS);
+      this.clickId = UUID.randomUUID().toString();
+      LOG.info(String.format("Emitting click: %s", this));
+    }
+
+    String getAdId() {
+      return adId;
+    }
+
+    String getUserId() {
+      return userId;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("(adId; %s, clickId: %s)",
+          adId,
+          clickId
+      );
+    }
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    // A KVStreamlet is produced. Each element is a KeyValue object where the key
+    // is the impression ID and the user ID is the value.
+    Streamlet<AdImpression> impressions = processingGraphBuilder
+        .newSource(AdImpression::new);
+
+    // A KVStreamlet is produced. Each element is a KeyValue object where the key
+    // is the ad ID and the user ID is the value.
+    Streamlet<AdClick> clicks = processingGraphBuilder
+        .newSource(AdClick::new);
+
+    /**
+     * Here, the impressions KVStreamlet is joined to the clicks KVStreamlet.
+     */
+    impressions
+        // The join function here essentially provides the reduce function with a streamlet
+        // of KeyValue objects where the userId matches across an impression and a click
+        // (meaning that the user has clicked on the ad).
+        .join(
+            // The other streamlet that's being joined to
+            clicks,
+            // Key extractor for the impressions streamlet
+            impression -> impression.getUserId(),
+            // Key extractor for the clicks streamlet
+            click -> click.getUserId(),
+            // Window configuration for the join operation
+            WindowConfig.TumblingCountWindow(25),
+            // Join type (inner join means that all elements from both streams will be included)
+            JoinType.INNER,
+            // For each element resulting from the join operation, a value of 1 will be provided
+            // if the ad IDs match between the elements (or a value of 0 if they don't).
+            (user1, user2) -> (user1.getAdId().equals(user2.getAdId())) ? 1 : 0
+        )
+        // The reduce function counts the number of ad clicks per user.
+        .reduceByKeyAndWindow(
+            // Key extractor for the reduce operation
+            kv -> String.format("user-%s", kv.getKey().getKey()),
+            // Value extractor for the reduce operation
+            kv -> kv.getValue(),
+            // Window configuration for the reduce operation
+            WindowConfig.TumblingCountWindow(50),
+            // A running cumulative total is calculated for each key
+            (cumulative, incoming) -> cumulative + incoming
+        )
+        // Finally, the consumer operation provides formatted log output
+        .consume(kw -> {
+          LOG.info(String.format("(user: %s, clicks: %d)",
+              kw.getKey().getKey(),
+              kw.getValue()));
+        });
+
+    Config config = Config.defaultConfig();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
new file mode 100644
index 0000000..a24f013
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
@@ -0,0 +1,79 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Resources;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This is a very simple topology that shows a series of streamlet operations
+ * on a source streamlet of random integers (between 1 and 10). First, 1 is added
+ * to each integer. That streamlet is then united with a streamlet that consists
+ * of an indefinite stream of zeroes. At that point, all 2s are excluded from the
+ * streamlet. The final output of the processing graph is then logged.
+ */
+public final class IntegerProcessingTopology {
+  private IntegerProcessingTopology() {
+  }
+
+  // Heron resources to be applied to the topology
+  private static final float CPU = 2.0f;
+  private static final long GIGABYTES_OF_RAM = 6;
+  private static final int NUM_CONTAINERS = 2;
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder builder = Builder.createBuilder();
+
+    Streamlet<Integer> zeroes = builder.newSource(() -> 0);
+
+    builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
+        .setNumPartitions(2)
+        .setName("random-ints")
+        .map(i -> i + 1)
+        .setName("add-one")
+        .union(zeroes)
+        .setName("unify-streams")
+        .filter(i -> i != 2)
+        .setName("remove-twos")
+        .log();
+
+    Resources resources = new Resources.Builder()
+        .setCpu(CPU)
+        .setRamInGB(GIGABYTES_OF_RAM)
+        .build();
+
+    Config config = new Config.Builder()
+        .setNumContainers(NUM_CONTAINERS)
+        .setContainerResources(resources)
+        .build();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, builder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/RepartitionTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/RepartitionTopology.java
new file mode 100644
index 0000000..6a02325
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/RepartitionTopology.java
@@ -0,0 +1,111 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This topology demonstrates the usage of a simple repartitioning algorithm
+ * using the Heron Streamlet API for Java. Normally, streamlet elements are
+ * distributed randomly across downstream instances when processed.
+ * Repartitioning enables you to select which instances (partitions) to send
+ * elements to on the basis of a user-defined logic. Here, a source streamlet
+ * emits an indefinite series of random integers between 1 and 100. The value
+ * of that number then determines to which topology instance (partition) the
+ * element is routed.
+ */
+public final class RepartitionTopology {
+  private RepartitionTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(RepartitionTopology.class.getName());
+
+  /**
+   * The repartition function that determines to which partition each incoming
+   * streamlet element is routed (across 8 possible partitions). Integers between 1
+   * and 24 are routed to partitions 0 and 1, integers between 25 and 40 to partitions
+   * 2 and 3, and so on.
+   */
+  private static List<Integer> repartitionStreamlet(int incomingInteger, int numPartitions) {
+    List<Integer> partitions;
+
+    if (incomingInteger >= 0 && incomingInteger < 25) {
+      partitions = Arrays.asList(0, 1);
+    } else if (incomingInteger > 26 && incomingInteger < 50) {
+      partitions = Arrays.asList(2, 3);
+    } else if (incomingInteger > 50 && incomingInteger < 75) {
+      partitions = Arrays.asList(4, 5);
+    } else if (incomingInteger > 76 && incomingInteger <= 100) {
+      partitions = Arrays.asList(6, 7);
+    } else {
+      partitions = Arrays.asList(ThreadLocalRandom.current().nextInt(0, 8));
+    }
+
+    String logMessage = String.format("Sending value %d to partitions: %s",
+        incomingInteger,
+        StreamletUtils.intListAsString(partitions));
+
+    LOG.info(logMessage);
+
+    return partitions;
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    Streamlet<Integer> randomIntegers = processingGraphBuilder
+        .newSource(() -> {
+          // Random integers are emitted every 50 milliseconds
+          StreamletUtils.sleep(50);
+          return ThreadLocalRandom.current().nextInt(100);
+        })
+        .setNumPartitions(2)
+        .setName("random-integer-source");
+
+    randomIntegers
+        // The specific repartition logic is applied here
+        .repartition(8, RepartitionTopology::repartitionStreamlet)
+        .setName("repartition-incoming-values")
+        // Here, a generic repartition logic is applied (simply
+        // changing the number of partitions without specifying
+        // how repartitioning will take place)
+        .repartition(2)
+        .setName("reduce-partitions-for-logging-operation")
+        .log();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    Config config = Config.defaultConfig();
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/SimplePulsarSourceTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/SimplePulsarSourceTopology.java
new file mode 100644
index 0000000..d66b6ff
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/SimplePulsarSourceTopology.java
@@ -0,0 +1,126 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.logging.Logger;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Source;
+
+/**
+ * This topology demonstrates how sources work in the Heron Streamlet API
+ * for Java. The example source here reads from an Apache Pulsar topic and
+ * injects incoming messages into the processing graph.
+ */
+public final class SimplePulsarSourceTopology {
+  private SimplePulsarSourceTopology() {
+  }
+
+  private static final Logger LOG =
+          Logger.getLogger(SimplePulsarSourceTopology.class.getName());
+
+  private static class PulsarSource implements Source<String> {
+    private static final long serialVersionUID = -3433804102901363106L;
+    private PulsarClient client;
+    private Consumer consumer;
+    private String pulsarConnectionUrl;
+    private String consumeTopic;
+    private String subscription;
+
+    PulsarSource(String url, String topic, String subscription) {
+      this.pulsarConnectionUrl = url;
+      this.consumeTopic = topic;
+      this.subscription = subscription;
+    }
+
+    /**
+     * The setup functions defines the instantiation logic for the source.
+     * Here, a Pulsar client and consumer are created that will listen on
+     * the Pulsar topic.
+     */
+    public void setup(Context context) {
+      try {
+        client = PulsarClient.create(pulsarConnectionUrl);
+        consumer = client.subscribe(consumeTopic, subscription);
+      } catch (PulsarClientException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
+     * The get function defines how elements for the source streamlet are
+     * "gotten." In this case, the Pulsar consumer for the specified topic
+     * listens for incoming messages.
+     */
+    public Collection<String> get() {
+      try {
+        String retval = new String(consumer.receive().getData(), "utf-8");
+        return Collections.singletonList(retval);
+      } catch (PulsarClientException | UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void cleanup() {
+    }
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    /**
+     * A Pulsar source is constructed for a specific Pulsar installation, topic, and
+     * subsecription.
+     */
+    Source<String> pulsarSource = new PulsarSource(
+        "pulsar://localhost:6650", // Pulsar connection URL
+        "persistent://sample/standalone/ns1/heron-pulsar-test-topic", // Pulsar topic
+        "subscription-1" // Subscription name for the Pulsar topic
+    );
+
+    /**
+     * In this processing graph, the source streamlet consists of messages on a
+     * Pulsar topic. Those messages are simply logged without any processing logic
+     * applied to them.
+     */
+    processingGraphBuilder.newSource(pulsarSource)
+        .setName("incoming-pulsar-messages")
+        .consume(s -> LOG.info(String.format("Message received from Pulsar: \"%s\"", s)));
+
+    Config config = Config.defaultConfig();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/SmartWatchTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/SmartWatchTopology.java
new file mode 100644
index 0000000..afa05bc
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/SmartWatchTopology.java
@@ -0,0 +1,123 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.KeyValue;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.WindowConfig;
+
+/**
+ * This topology shows an example usage of a reduce function. A source streamlet emits smart watch readings every 10
+ * seconds from one of several joggers. Those readings provide  * The processing graph then converts those smart watch
+ * readings to a KeyValue object, which is passed to a reduce function that calculates a per-jogger total number of feet
+ * run in the last minute. The reduced value is then used to provide a per-runner average pace (feet per minute) and the
+ * result is logged using a consume operation (which allows for a formatted log).
+ */
+public final class SmartWatchTopology {
+  private SmartWatchTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(SmartWatchTopology.class.getName());
+
+  private static final List<String> JOGGERS = Arrays.asList(
+      "bill",
+      "ted"
+  );
+
+  private static class SmartWatchReading implements Serializable {
+    private static final long serialVersionUID = -6555650939020508026L;
+    private final String joggerId;
+    private final int feetRun;
+
+    SmartWatchReading() {
+      StreamletUtils.sleep(1000);
+      this.joggerId = StreamletUtils.randomFromList(JOGGERS);
+      this.feetRun = ThreadLocalRandom.current().nextInt(200, 400);
+    }
+
+    String getJoggerId() {
+      return joggerId;
+    }
+
+    int getFeetRun() {
+      return feetRun;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    processingGraphBuilder.newSource(SmartWatchReading::new)
+        .setName("incoming-watch-readings")
+        .reduceByKeyAndWindow(
+            // Key extractor
+            reading -> reading.getJoggerId(),
+            // Value extractor
+            reading -> reading.getFeetRun(),
+            // The time window (1 minute of clock time)
+            WindowConfig.TumblingTimeWindow(Duration.ofSeconds(10)),
+            // The reduce function (produces a cumulative sum)
+            (cumulative, incoming) -> cumulative + incoming
+        )
+        .setName("reduce-to-total-distance-per-jogger")
+        .map(keyWindow -> {
+          // The per-key result of the previous reduce step
+          long totalFeetRun = keyWindow.getValue();
+
+          // The amount of time elapsed
+          long startTime = keyWindow.getKey().getWindow().getStartTime();
+          long endTime = keyWindow.getKey().getWindow().getEndTime();
+          long timeLengthMillis = endTime - startTime; // Cast to float to use as denominator
+
+          // The feet-per-minute calculation
+          float feetPerMinute = totalFeetRun / (float) (timeLengthMillis / 1000);
+
+          // Reduce to two decimal places
+          String paceString = new DecimalFormat("#.##").format(feetPerMinute);
+
+          // Return a per-jogger average pace
+          return new KeyValue<>(keyWindow.getKey().getKey(), paceString);
+        })
+        .setName("calculate-average-speed")
+        .consume(kv -> {
+          String logMessage = String.format("(runner: %s, avgFeetPerMinute: %s)",
+              kv.getKey(),
+              kv.getValue());
+
+          LOG.info(logMessage);
+        });
+
+    Config config = Config.defaultConfig();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/StreamletCloneTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/StreamletCloneTopology.java
new file mode 100644
index 0000000..334e674
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/StreamletCloneTopology.java
@@ -0,0 +1,158 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Sink;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This topology demonstrates clone operations on streamlets in the Heron
+ * Streamlet API for Java. A clone operation creates multiple identical copies
+ * of a streamlet. Clone operations are especially useful if you want to, for
+ * example send streamlet elements to separate sinks, as is done here. A
+ * supplier streamlet emits random scores in a game (per player). That initial
+ * streamlet is cloned into two. One of the cloned streams goes to a custom
+ * logging sink while the other goes to a dummy database sink.
+ */
+public final class StreamletCloneTopology {
+  private StreamletCloneTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(StreamletCloneTopology.class.getName());
+
+  /**
+   * A list of players of the game ("player1" through "player100").
+   */
+  private static final List<String> PLAYERS = IntStream.range(1, 100)
+      .mapToObj(i -> String.format("player%d", i))
+      .collect(Collectors.toList());
+
+  /**
+   * A POJO for game scores.
+   */
+  private static class GameScore implements Serializable {
+    private static final long serialVersionUID = 1089454399729015529L;
+    private String playerId;
+    private int score;
+
+    GameScore() {
+      this.playerId = StreamletUtils.randomFromList(PLAYERS);
+      this.score = ThreadLocalRandom.current().nextInt(1000);
+    }
+
+    String getPlayerId() {
+      return playerId;
+    }
+
+    int getScore() {
+      return score;
+    }
+  }
+
+  /**
+   * A phony database sink. This sink doesn't actually interact with a database.
+   * Instead, it logs each incoming score to stdout.
+   */
+  private static class DatabaseSink implements Sink<GameScore> {
+    private static final long serialVersionUID = 5544736723673011054L;
+
+    private void saveToDatabase(GameScore score) {
+      // This is a dummy operation, so no database logic will be implemented here
+    }
+
+    public void setup(Context context) {
+    }
+
+    public void put(GameScore score) {
+      String logMessage = String.format("Saving a score of %d for player %s to the database",
+          score.getScore(),
+          score.getPlayerId());
+      LOG.info(logMessage);
+      saveToDatabase(score);
+    }
+
+    public void cleanup() {
+    }
+  }
+
+  /**
+   * A logging sink that simply prints a formatted log message for each incoming score.
+   */
+  private static class FormattedLogSink implements Sink<GameScore> {
+    private static final long serialVersionUID = 1251089445039059977L;
+    public void setup(Context context) {
+    }
+
+    public void put(GameScore score) {
+      String logMessage = String.format("The current score for player %s is %d",
+          score.getPlayerId(),
+          score.getScore());
+      LOG.info(logMessage);
+    }
+
+    public void cleanup() {
+    }
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    /**
+     * A supplier streamlet of random GameScore objects is cloned into two
+     * separate streamlets.
+     */
+    List<Streamlet<GameScore>> splitGameScoreStreamlet = processingGraphBuilder
+        .newSource(GameScore::new)
+        .clone(2);
+
+    /**
+     * Elements in the first cloned streamlet go to the database sink.
+     */
+    splitGameScoreStreamlet.get(0)
+        .toSink(new DatabaseSink());
+
+    /**
+     * Elements in the second cloned streamlet go to the logging sink.
+     */
+    splitGameScoreStreamlet.get(1)
+        .toSink(new FormattedLogSink());
+
+    Config config = Config.defaultConfig();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/TransformsTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/TransformsTopology.java
new file mode 100644
index 0000000..83fddbd
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/TransformsTopology.java
@@ -0,0 +1,122 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.SerializableTransformer;
+
+/**
+ * In this topology, a supplier generates an indefinite series of random integers between 1
+ * and 100. From there, a series of transform operations are applied that ultimately leave
+ * the original value unchanged.
+ */
+public final class TransformsTopology {
+  private TransformsTopology() {
+  }
+
+  private static final Logger LOG = Logger.getLogger(TransformsTopology.class.getName());
+
+  /**
+   * This transformer leaves incoming values unmodified. The Consumer simply accepts incoming
+   * values as-is during the transform phase.
+   */
+  private static class DoNothingTransformer<T> implements SerializableTransformer<T, T> {
+    private static final long serialVersionUID = 3717991700067221067L;
+
+    public void setup(Context context) {
+    }
+
+    /**
+     * Here, the incoming value is accepted as-is and not changed (hence the "do nothing"
+     * in the class name).
+     */
+    public void transform(T in, Consumer<T> consumer) {
+      consumer.accept(in);
+    }
+
+    public void cleanup() {
+    }
+  }
+
+  /**
+   * This transformer increments incoming values by a user-supplied increment (which can also,
+   * of course, be negative).
+   */
+  private static class IncrementTransformer implements SerializableTransformer<Integer, Integer> {
+    private static final long serialVersionUID = -3198491688219997702L;
+    private int increment;
+    private int total;
+
+    IncrementTransformer(int increment) {
+      this.increment = increment;
+    }
+
+    public void setup(Context context) {
+      context.registerMetric("InCrementMetric", 30, () -> total);
+    }
+
+    /**
+     * Here, the incoming value is incremented by the value specified in the
+     * transformer's constructor.
+     */
+    public void transform(Integer in, Consumer<Integer> consumer) {
+      int incrementedValue = in + increment;
+      total += increment;
+      consumer.accept(incrementedValue);
+    }
+
+    public void cleanup() {
+    }
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder builder = Builder.createBuilder();
+
+    /**
+     * The processing graph consists of a supplier streamlet that emits
+     * random integers between 1 and 100. From there, a series of transformers
+     * is applied. At the end of the graph, the original value is ultimately
+     * unchanged.
+     */
+    builder.newSource(() -> ThreadLocalRandom.current().nextInt(100))
+        .transform(new DoNothingTransformer<>())
+        .transform(new IncrementTransformer(10))
+        .transform(new IncrementTransformer(-7))
+        .transform(new DoNothingTransformer<>())
+        .transform(new IncrementTransformer(-3))
+        .log();
+
+    Config config = Config.defaultConfig();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, builder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
new file mode 100644
index 0000000..8bfa62e
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
@@ -0,0 +1,97 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.WindowConfig;
+
+/**
+ * This topology is an implementation of the classic word count example
+ * for the Heron Streamlet API for Java. A source streamlet emits an
+ * indefinite series of sentences chosen at random from a pre-defined list.
+ * Each sentence is then "flattened" into a list of individual words. A
+ * reduce function keeps a running tally of the number of times each word
+ * is encountered within each time window (in this case a tumbling count
+ * window of 50 operations). The result is then logged.
+ */
+public final class WindowedWordCountTopology {
+  private WindowedWordCountTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(WindowedWordCountTopology.class.getName());
+
+  private static final List<String> SENTENCES = Arrays.asList(
+      "I have nothing to declare but my genius",
+      "You can even",
+      "Compassion is an action word with no boundaries",
+      "To thine own self be true"
+  );
+
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.createBuilder();
+
+    processingGraphBuilder
+        // The origin of the processing graph: an indefinite series of sentences chosen
+        // from the list
+        .newSource(() -> StreamletUtils.randomFromList(SENTENCES))
+        .setName("random-sentences-source")
+        // Each sentence is "flattened" into a Streamlet<String> of individual words
+        .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+        .setName("flatten-into-individual-words")
+        // The reduce operation performs the per-key (i.e. per-word) sum within each time window
+        .reduceByKeyAndWindow(
+            // The key extractor (the word is left unchanged)
+            word -> word,
+            // Value extractor (the value is always 1)
+            word -> 1,
+            WindowConfig.TumblingCountWindow(50),
+            (x, y) -> x + y
+        )
+        .setName("reduce-operation")
+        // The final output is logged using a user-supplied format
+        .consume(kv -> {
+          String logMessage = String.format("(word: %s, count: %d)",
+              kv.getKey().getKey(),
+              kv.getValue()
+          );
+          LOG.info(logMessage);
+        });
+
+    // The topology's parallelism (the number of containers across which the topology's
+    // processing instance will be split) can be defined via the second command-line
+    // argument (or else the default of 2 will be used).
+    int topologyParallelism = StreamletUtils.getParallelism(args, 2);
+
+    Config config = new Config.Builder()
+        .setNumContainers(topologyParallelism)
+        .useKryoSerializer()
+        .build();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WireRequestsTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WireRequestsTopology.java
new file mode 100644
index 0000000..bebfa66
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/WireRequestsTopology.java
@@ -0,0 +1,193 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This topology demonstrates how different streamlets can be united into one
+ * using union operations. Wire requests (customers asking to wire money) are
+ * created for three different bank branches. Each bank branch is a separate
+ * streamlet. In each branch's streamlet, the request amount is checked to make
+ * sure that no one requests a wire transfer of more than $500. Then, the
+ * wire request streamlets for all three branches are combined into one using a
+ * union operation. Each element in the unified streamlet then passes through a
+ * fraud detection filter that ensures that no "bad" customers are allowed to
+ * make requests.
+ */
+public final class WireRequestsTopology {
+  private WireRequestsTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(WireRequestsTopology.class.getName());
+
+  /**
+   * A list of current customers (some good, some bad).
+   */
+  private static final List<String> CUSTOMERS = Arrays.asList(
+      "honest-tina",
+      "honest-jeff",
+      "scheming-dave",
+      "scheming-linda"
+  );
+
+  /**
+   * A list of bad customers whose requests should be rejected.
+   */
+  private static final List<String> FRAUDULENT_CUSTOMERS = Arrays.asList(
+      "scheming-dave",
+      "scheming-linda"
+  );
+
+  /**
+   * The maximum allowable amount for transfers. Requests for more than this
+   * amount need to be rejected.
+   */
+  private static final int MAX_ALLOWABLE_AMOUNT = 500;
+
+  /**
+   * A POJO for wire requests.
+   */
+  private static class WireRequest implements Serializable {
+    private static final long serialVersionUID = 1311441220738558016L;
+    private String customerId;
+    private int amount;
+
+    WireRequest(long delay) {
+      // The pace at which requests are generated is throttled. Different
+      // throttles are applied to different bank branches.
+      StreamletUtils.sleep(delay);
+      this.customerId = StreamletUtils.randomFromList(CUSTOMERS);
+      this.amount = ThreadLocalRandom.current().nextInt(1000);
+      LOG.info(String.format("New wire request: %s", this));
+    }
+
+    String getCustomerId() {
+      return customerId;
+    }
+
+    int getAmount() {
+      return amount;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("(customer: %s, amount: %d)", customerId, amount);
+    }
+  }
+
+  /**
+   * Each request is checked to make sure that requests from untrustworthy customers
+   * are rejected.
+   */
+  private static boolean fraudDetect(WireRequest request) {
+    String logMessage;
+
+    boolean fraudulent = FRAUDULENT_CUSTOMERS.contains(request.getCustomerId());
+
+    if (fraudulent) {
+      logMessage = String.format("Rejected fraudulent customer %s",
+          request.getCustomerId());
+      LOG.warning(logMessage);
+    } else {
+      logMessage = String.format("Accepted request for $%d from customer %s",
+          request.getAmount(),
+          request.getCustomerId());
+      LOG.info(logMessage);
+    }
+
+    return !fraudulent;
+  }
+
+  /**
+   * Each request is checked to make sure that no one requests an amount over $500.
+   */
+  private static boolean checkRequestAmount(WireRequest request) {
+    boolean sufficientBalance = request.getAmount() < MAX_ALLOWABLE_AMOUNT;
+
+    if (!sufficientBalance) {
+      LOG.warning(
+          String.format("Rejected excessive request of $%d",
+              request.getAmount()));
+    }
+
+    return sufficientBalance;
+  }
+
+  /**
+   * All Heron topologies require a main function that defines the topology's behavior
+   * at runtime
+   */
+  public static void main(String[] args) throws Exception {
+    Builder builder = Builder.createBuilder();
+
+    // Requests from the "quiet" bank branch (high throttling).
+    Streamlet<WireRequest> quietBranch = builder.newSource(() -> new WireRequest(20))
+        .setNumPartitions(1)
+        .setName("quiet-branch-requests")
+        .filter(WireRequestsTopology::checkRequestAmount)
+        .setName("quiet-branch-check-balance");
+
+    // Requests from the "medium" bank branch (medium throttling).
+    Streamlet<WireRequest> mediumBranch = builder.newSource(() -> new WireRequest(10))
+        .setNumPartitions(2)
+        .setName("medium-branch-requests")
+        .filter(WireRequestsTopology::checkRequestAmount)
+        .setName("medium-branch-check-balance");
+
+    // Requests from the "busy" bank branch (low throttling).
+    Streamlet<WireRequest> busyBranch = builder.newSource(() -> new WireRequest(5))
+        .setNumPartitions(4)
+        .setName("busy-branch-requests")
+        .filter(WireRequestsTopology::checkRequestAmount)
+        .setName("busy-branch-check-balance");
+
+    // Here, the streamlets for the three bank branches are united into one. The fraud
+    // detection filter then operates on that unified streamlet.
+    quietBranch
+        .union(mediumBranch)
+        .setNumPartitions(2)
+        .setName("union-1")
+        .union(busyBranch)
+        .setName("union-2")
+        .setNumPartitions(4)
+        .filter(WireRequestsTopology::fraudDetect)
+        .setName("all-branches-fraud-detect")
+        .log();
+
+    Config config = new Config.Builder()
+        .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
+        .setNumContainers(2)
+        .build();
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, builder);
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WordCountStreamletTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WordCountStreamletTopology.java
deleted file mode 100644
index 82525a0..0000000
--- a/examples/src/java/com/twitter/heron/examples/streamlet/WordCountStreamletTopology.java
+++ /dev/null
@@ -1,60 +0,0 @@
-//  Copyright 2017 Twitter. All rights reserved.
-//
-//  Licensed under the Apache License, Version 2.0 (the "License");
-//  you may not use this file except in compliance with the License.
-//  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//  Unless required by applicable law or agreed to in writing, software
-//  distributed under the License is distributed on an "AS IS" BASIS,
-//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//  See the License for the specific language governing permissions and
-//  limitations under the License.
-
-package com.twitter.heron.examples.streamlet;
-
-import java.util.Arrays;
-
-import com.twitter.heron.streamlet.Builder;
-import com.twitter.heron.streamlet.Config;
-import com.twitter.heron.streamlet.Runner;
-import com.twitter.heron.streamlet.WindowConfig;
-
-/**
- * This is a topology that does simple word counts.
- * <p>
- * In this topology,
- * 1. The sentence "Mary had a little lamb" is generated over and over again.
- * 2. The flatMap stage splits the sentence into words
- * 3. The map stage 'counts' each word one at a time.
- * 4. The reduceByKeyAndWindow stage assembles a tumbling count window and
- *    computes the counts of all words grouped by word.
- */
-public final class WordCountStreamletTopology {
-  private WordCountStreamletTopology() {
-  }
-
-  /**
-   * Main method
-   */
-  public static void main(String[] args) {
-    if (args.length < 1) {
-      throw new RuntimeException("Specify topology name");
-    }
-
-    int parallelism = 1;
-    if (args.length > 1) {
-      parallelism = Integer.parseInt(args[1]);
-    }
-    Builder builder = Builder.createBuilder();
-    builder.newSource(() -> "Mary had a little lamb")
-        .flatMap((sentence) -> Arrays.asList(sentence.split("\\s+")))
-        .reduceByKeyAndWindow(x -> x, WindowConfig.TumblingCountWindow(10), 0, (x, y) -> x + 1)
-        .log();
-    Config conf = new Config();
-    conf.setNumContainers(parallelism);
-    Runner runner = new Runner();
-    runner.run(args[0], conf, builder);
-  }
-}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/utils/StreamletUtils.java b/examples/src/java/com/twitter/heron/examples/streamlet/utils/StreamletUtils.java
new file mode 100644
index 0000000..4f01aa4
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/utils/StreamletUtils.java
@@ -0,0 +1,69 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.examples.streamlet.utils;
+
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+/**
+ * A collection of helper functions for the Streamlet API example topologies
+ */
+public final class StreamletUtils {
+  private StreamletUtils() {
+  }
+
+  public static void sleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Fetches the topology's name from the first command-line argument or
+   * throws an exception if not present.
+   */
+  public static String getTopologyName(String[] args) throws Exception {
+    if (args.length == 0) {
+      throw new Exception("You must supply a name for the topology");
+    } else {
+      return args[0];
+    }
+  }
+
+  /**
+   * Selects a random item from a list. Used in many example source streamlets.
+   */
+  public static <T> T randomFromList(List<T> ls) {
+    return ls.get(new Random().nextInt(ls.size()));
+  }
+
+  /**
+   * Fetches the topology's parallelism from the second-command-line
+   * argument or defers to a supplied default.
+   */
+  public static int getParallelism(String[] args, int defaultParallelism) {
+    return (args.length > 1) ? Integer.parseInt(args[1]) : defaultParallelism;
+  }
+
+  /**
+   * Converts a list of integers into a comma-separated string.
+   */
+  public static String intListAsString(List<Integer> ls) {
+    return String.join(", ", ls.stream().map(i -> i.toString()).collect(Collectors.toList()));
+  }
+}
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index 2e13df0..e98fe2e 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -20,17 +20,29 @@
         "//heron/common/src/java:basics-java",
     ]
 
+# Low Level Api
 java_library(
-    name = "api-java",
-    srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]),
+    name = "api-java-low-level",
+    srcs = glob(["com/twitter/heron/api/**/*.java"]),
     javacopts = DOCLINT_HTML_AND_SYNTAX,
     deps = api_deps_files,
 )
 
+# Functional Api
+java_library(
+    name = "api-java",
+    srcs = glob(["com/twitter/heron/streamlet/**/*.java"]),
+    javacopts = DOCLINT_HTML_AND_SYNTAX,
+    deps = api_deps_files + [
+        ":api-java-low-level",
+        "//third_party/java:kryo-neverlink",
+    ]
+)
+
 java_binary(
     name = "api-unshaded",
     srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]),
-    deps = api_deps_files,
+    deps = api_deps_files + ["//third_party/java:kryo-neverlink"],
 )
 
 jarjar_binary(
diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java
index 5c12615..b281e54 100644
--- a/heron/api/src/java/com/twitter/heron/api/Config.java
+++ b/heron/api/src/java/com/twitter/heron/api/Config.java
@@ -34,6 +34,7 @@
 package com.twitter.heron.api;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -266,6 +267,19 @@
    */
   public static final String TOPOLOGY_ENVIRONMENT = "topology.environment";
 
+  /**
+   * Timer events registered for a topology.
+   * This is a Map<String, Pair<Duration, Runnable>>.
+   * Where the key is the name and the value contains the frequency of the event
+   * and the task to run.
+   */
+  public static final String TOPOLOGY_TIMER_EVENTS = "topology.timer.events";
+
+  /**
+   * Enable Remote debugging for java heron instances
+   */
+  public static final String TOPOLOGY_REMOTE_DEBUGGING_ENABLE = "topology.remote.debugging.enable";
+
   private static final long serialVersionUID = 2550967708478837032L;
   // We maintain a list of all user exposed vars
   private static Set<String> apiVars = new HashSet<>();
@@ -301,6 +315,7 @@
     apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH);
     apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS);
     apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS);
+    apiVars.add(TOPOLOGY_REMOTE_DEBUGGING_ENABLE);
   }
 
   public Config() {
@@ -652,4 +667,35 @@
   public void setTopologyStatefulStartClean(boolean clean) {
     setTopologyStatefulStartClean(this, clean);
   }
+
+  /**
+   * Registers a timer event that executes periodically
+   * @param conf the map with the existing topology configs
+   * @param name the name of the timer
+   * @param interval the frequency in which to run the task
+   * @param task the task to run
+   */
+  @SuppressWarnings("unchecked")
+  public static void registerTopologyTimerEvents(Map<String, Object> conf,
+                                                 String name, Duration interval,
+                                                 Runnable task) {
+    if (interval.isZero() || interval.isNegative()) {
+      throw new IllegalArgumentException("Timer duration needs to be positive");
+    }
+    if (!conf.containsKey(Config.TOPOLOGY_TIMER_EVENTS)) {
+      conf.put(Config.TOPOLOGY_TIMER_EVENTS, new HashMap<String, Pair<Duration, Runnable>>());
+    }
+
+    Map<String, Pair<Duration, Runnable>> timers
+        = (Map<String, Pair<Duration, Runnable>>) conf.get(Config.TOPOLOGY_TIMER_EVENTS);
+
+    if (timers.containsKey(name)) {
+      throw new IllegalArgumentException("Timer with name " + name + " already exists");
+    }
+    timers.put(name, Pair.of(interval, task));
+  }
+
+  public void setTopologyRemoteDebugging(boolean isOn) {
+    this.put(Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, String.valueOf(isOn));
+  }
 }
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
index 9a487f4..cf52f7c 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
@@ -290,4 +290,10 @@
 
     throw new IllegalStateException("Failed to find topology defn file");
   }
+
+  public static boolean getTopologyRemoteDebuggingEnabled(TopologyAPI.Topology topology) {
+    List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+    return Boolean.parseBoolean(TopologyUtils.getConfigWithDefault(
+        topologyConfig, Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, "false"));
+  }
 }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
index e4aca45..a6ee37a 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
@@ -17,69 +17,39 @@
 import java.io.Serializable;
 
 import com.twitter.heron.common.basics.ByteAmount;
+import com.twitter.heron.streamlet.impl.KryoSerializer;
 
 /**
  * Config is the way users configure the execution of the topology.
- * Things like tuple delivery semantics, resources used, as well as
- * user defined key/value pairs are passed on to the runner via
+ * Things like streamlet delivery semantics, resources used, as well as
+ * user-defined key/value pairs are passed on to the topology runner via
  * this class.
  */
 public final class Config implements Serializable {
   private static final long serialVersionUID = 6204498077403076352L;
+
   private com.twitter.heron.api.Config heronConfig;
+
   public enum DeliverySemantics {
     ATMOST_ONCE,
     ATLEAST_ONCE,
     EFFECTIVELY_ONCE
   }
 
-  public Config() {
-    heronConfig = new com.twitter.heron.api.Config();
+  private Config(Builder builder) {
+    heronConfig = builder.config;
   }
 
-  Config(com.twitter.heron.api.Config config) {
-    heronConfig = config;
+  public static Config defaultConfig() {
+    return new Builder()
+        .build();
   }
 
   com.twitter.heron.api.Config getHeronConfig() {
     return heronConfig;
   }
 
-  /**
-   * Sets the delivery semantics of the topology
-   * @param semantic The delivery semantic to be enforced
-   */
-  public void setDeliverySemantics(DeliverySemantics semantic) {
-    heronConfig.setTopologyReliabilityMode(translateSemantics(semantic));
-  }
-
-  /**
-   * Sets the number of containers to run this topology
-   * @param numContainers The number of containers to distribute this topology
-   */
-  public void setNumContainers(int numContainers) {
-    heronConfig.setNumStmgrs(numContainers);
-  }
-
-  /**
-   * Sets resources used per container by this topology
-   * @param resource The resource per container to dedicate per container
-   */
-  public void setContainerResources(Resources resource) {
-    heronConfig.setContainerCpuRequested(resource.getCpu());
-    heronConfig.setContainerRamRequested(ByteAmount.fromBytes(resource.getRam()));
-  }
-
-  /**
-   * Sets some user defined key value mapping
-   * @param key The user defined key
-   * @param value The user defined object
-   */
-  public void setUserConfig(String key, Object value) {
-    heronConfig.put(key, value);
-  }
-
-  private com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
+  private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
       DeliverySemantics semantics) {
     switch (semantics) {
       case ATMOST_ONCE:
@@ -92,4 +62,67 @@
         return com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
     }
   }
+
+  public static class Builder {
+    private com.twitter.heron.api.Config config;
+
+    public Builder() {
+      config = new com.twitter.heron.api.Config();
+    }
+
+    /**
+     * Sets the number of containers to run this topology
+     * @param numContainers The number of containers to distribute this topology
+     */
+    public Builder setNumContainers(int numContainers) {
+      config.setNumStmgrs(numContainers);
+      return this;
+    }
+
+    /**
+     * Sets resources used per container by this topology
+     * @param resources The resource to dedicate per container
+     */
+    public Builder setContainerResources(Resources resources) {
+      config.setContainerCpuRequested(resources.getCpu());
+      config.setContainerRamRequested(ByteAmount.fromBytes(resources.getRam()));
+      return this;
+    }
+
+    /**
+     * Sets the delivery semantics of the topology
+     * @param semantic The delivery semantic to be enforced
+     */
+    public Builder setDeliverySemantics(DeliverySemantics semantic) {
+      config.setTopologyReliabilityMode(Config.translateSemantics(semantic));
+      return this;
+    }
+
+    /**
+     * Sets some user-defined key/value mapping
+     * @param key The user-defined key
+     * @param value The user-defined value
+     */
+    public Builder setUserConfig(String key, Object value) {
+      config.put(key, value);
+      return this;
+    }
+
+    /**
+     * Sets the topology to use the Kryo serializer for serializing
+     * streamlet elements
+     */
+    public Builder useKryoSerializer() {
+      try {
+        config.setSerializationClassName(new KryoSerializer().getClass().getName());
+      } catch (NoClassDefFoundError e) {
+        throw new RuntimeException("Linking with kryo is needed because useKryoSerializer is used");
+      }
+      return this;
+    }
+
+    public Config build() {
+      return new Config(this);
+    }
+  }
 }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Context.java b/heron/api/src/java/com/twitter/heron/streamlet/Context.java
index 8371011..34fcf0e 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Context.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Context.java
@@ -16,6 +16,7 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import com.twitter.heron.api.state.State;
 
@@ -50,6 +51,14 @@
   int getStreamPartition();
 
   /**
+   * Register a metric function. This function will be called
+   * by the system every collectionInterval seconds and the resulting value
+   * will be collected
+   */
+  <T> void registerMetric(String metricName, int collectionInterval,
+                      Supplier<T> metricFn);
+
+  /**
    * The state where components can store any of their local state
    * @return The state interface where users can store their local state
    */
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java b/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java
index 6e9770a..2c3b4e4 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java
@@ -31,6 +31,10 @@
     return new KeyValue<R, T>(k, v);
   }
 
+  KeyValue() {
+    // nothing really
+  }
+
   public KeyValue(K k, V v) {
     this.key = k;
     this.value = v;
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java b/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java
index fb1af85..25c06c4 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java
@@ -25,10 +25,16 @@
   private static final long serialVersionUID = 4193319775040181971L;
   private T key;
   private Window window;
+
+  KeyedWindow() {
+    // nothing really
+  }
+
   public KeyedWindow(T key, Window window) {
     this.key = key;
     this.window = window;
   }
+
   public T getKey() {
     return key;
   }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
index 967e700..1deffcf 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
@@ -18,13 +18,23 @@
 
 /**
  * Resources needed by the topology are encapsulated in this class.
- * Currently we deal with cpu and ram. Others can be added later.
+ * Currently we deal with CPU and RAM. Others can be added later.
  */
 public final class Resources implements Serializable {
   private static final long serialVersionUID = 630451253428388496L;
   private float cpu;
   private long ram;
 
+  private Resources(Builder builder) {
+    this.cpu = builder.cpu;
+    this.ram = builder.ram;
+  }
+
+  public static Resources defaultResources() {
+    return new Builder()
+        .build();
+  }
+
   public float getCpu() {
     return cpu;
   }
@@ -33,31 +43,58 @@
     return ram;
   }
 
-  public Resources() {
-    this.cpu = 1.0f;
-    this.ram = 104857600;
-  }
-
-  public Resources withCpu(float ncpu) {
-    this.cpu = ncpu;
-    return this;
-  }
-
-  public Resources withRam(long nram) {
-    this.ram = nram;
-    return this;
-  }
-
-  public Resources withRamInMB(long nram) {
-    return withRam(nram * 1024 * 1024);
-  }
-
-  public Resources withRamInGB(long nram) {
-    return withRamInMB(nram * 1024);
-  }
-
   @Override
   public String toString() {
-    return "{ CPU: " + String.valueOf(cpu) + " RAM: " + String.valueOf(ram) + " }";
+    return String.format("{ CPU: %s RAM: %s }", String.valueOf(cpu), String.valueOf(ram));
+  }
+
+  public static class Builder {
+    private float cpu;
+    private long ram;
+
+    public Builder() {
+      this.cpu = 1.0f;
+      this.ram = 104857600;
+    }
+
+    /**
+     * Sets the RAM to be used by the topology (in megabytes)
+     * @param nram The number of megabytes of RAM
+     */
+    public Builder setRamInMB(long nram) {
+      this.ram = nram * 1024;
+      return this;
+    }
+
+    /**
+     * Sets the RAM to be used by the topology (in gigabytes)
+     * @param nram The number of gigabytes of RAM
+     */
+    public Builder setRamInGB(long nram) {
+      this.ram = nram * 1024 * 1024;
+      return this;
+    }
+
+    /**
+     * Sets the total number of CPUs to be used by the topology
+     * @param containerCpu The number of CPUs (as a float)
+     */
+    public Builder setCpu(float containerCpu) {
+      this.cpu = containerCpu;
+      return this;
+    }
+
+    /**
+     * Sets the RAM to be used by the topology (in bytes)
+     * @param containerRam The number of bytes of RAM
+     */
+    public Builder setRam(long containerRam) {
+      this.ram = containerRam;
+      return this;
+    }
+
+    public Resources build() {
+      return new Resources(this);
+    }
   }
 }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Source.java b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
index 60a4903..fe2330a 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Source.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
@@ -15,6 +15,7 @@
 package com.twitter.heron.streamlet;
 
 import java.io.Serializable;
+import java.util.Collection;
 
 /**
  * Source is how Streamlet's originate. The get method
@@ -24,6 +25,6 @@
  */
 public interface Source<T> extends Serializable {
   void setup(Context context);
-  T get();
+  Collection<T> get();
   void cleanup();
 }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Window.java b/heron/api/src/java/com/twitter/heron/streamlet/Window.java
index eb14928..fb25e92 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Window.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Window.java
@@ -28,6 +28,10 @@
   private long endTimeMs;
   private long count;
 
+  Window() {
+    // nothing really
+  }
+
   public Window(long startTimeMs, long endTimeMs, long count) {
     this.startTimeMs = startTimeMs;
     this.endTimeMs = endTimeMs;
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java
index 31bbdfd..62fd7ba 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java
@@ -16,7 +16,9 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.function.Supplier;
 
+import com.twitter.heron.api.metric.IMetric;
 import com.twitter.heron.api.state.State;
 import com.twitter.heron.api.topology.TopologyContext;
 import com.twitter.heron.streamlet.Context;
@@ -59,7 +61,26 @@
   }
 
   @Override
+  public <T> void registerMetric(String metricName, int collectionInterval,
+                             Supplier<T> metricFn) {
+    topologyContext.registerMetric(metricName, new StreamletMetric<T>(metricFn),
+        collectionInterval);
+  }
+
+  @Override
   public State<Serializable, Serializable> getState() {
     return state;
   }
+
+  private class StreamletMetric<T> implements IMetric<T> {
+    private Supplier<T> metricFn;
+    StreamletMetric(Supplier<T> metricFn) {
+      this.metricFn = metricFn;
+    }
+
+    @Override
+    public T getValueAndReset() {
+      return metricFn.get();
+    }
+  }
 }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/KryoSerializer.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/KryoSerializer.java
new file mode 100644
index 0000000..1a80547
--- /dev/null
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/KryoSerializer.java
@@ -0,0 +1,97 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+
+package com.twitter.heron.streamlet.impl;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.esotericsoftware.kryo.serializers.DefaultSerializers;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+
+import com.twitter.heron.api.serializer.IPluggableSerializer;
+
+/**
+ * KryoSerializer is a wrapper around Heron's IPluggableSerializer.
+ * Streamlet based topologies turning on kryo serialization are based off of it.
+ */
+public class KryoSerializer implements IPluggableSerializer {
+  private Kryo kryo;
+  private Output kryoOut;
+  private Input kryoIn;
+
+  @Override
+  public void initialize(Map<String, Object> config) {
+    kryo = getKryo();
+    kryoOut = new Output(2000, 2000000000);
+    kryoIn = new Input(1);
+  }
+
+  @Override
+  public byte[] serialize(Object object) {
+    kryoOut.clear();
+    kryo.writeClassAndObject(kryoOut, object);
+    return kryoOut.toBytes();
+  }
+
+  @Override
+  public Object deserialize(byte[] input) {
+    kryoIn.setBuffer(input);
+    return kryo.readClassAndObject(kryoIn);
+  }
+
+  private Kryo getKryo() {
+    Kryo k = new Kryo();
+    k.setRegistrationRequired(false);
+    k.setReferences(false);
+    k.register(byte[].class);
+    k.register(ArrayList.class, new ArrayListSerializer());
+    k.register(HashMap.class, new HashMapSerializer());
+    k.register(HashSet.class, new HashSetSerializer());
+    k.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
+    return k;
+  }
+
+  private class ArrayListSerializer extends CollectionSerializer {
+    @Override
+    @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
+    public Collection create(Kryo k, Input input, Class<Collection> type) {
+      return new ArrayList();
+    }
+  }
+
+  private class HashMapSerializer extends MapSerializer {
+    @Override
+    @SuppressWarnings("rawtypes") // extending kryo class signature that takes Map
+    public Map<String, Object> create(Kryo k, Input input, Class<Map> type) {
+      return new HashMap<>();
+    }
+  }
+
+  private class HashSetSerializer extends CollectionSerializer {
+    @Override
+    @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
+    public Collection create(Kryo k, Input input, Class<Collection> type) {
+      return new HashSet();
+    }
+  }
+}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java
index f459839..2f8919d 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java
@@ -15,6 +15,7 @@
 package com.twitter.heron.streamlet.impl.sources;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Map;
 
 import com.twitter.heron.api.spout.SpoutOutputCollector;
@@ -58,9 +59,11 @@
 
   @Override
   public void nextTuple() {
-    R val = generator.get();
+    Collection<R> val = generator.get();
     if (val != null) {
-      collector.emit(new Values(val));
+      for (R tuple : val) {
+        collector.emit(new Values(tuple));
+      }
     }
   }
 }
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index ddf8e0e..a69bf79 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -1,6 +1,7 @@
 load("/tools/rules/heron_deps", "heron_java_api_proto_files")
 
 api_deps_files = [
+    "//heron/api/src/java:api-java-low-level",
     "//heron/api/src/java:api-java",
     "//heron/common/src/java:utils-java",
     "//heron/common/src/java:basics-java",
@@ -31,6 +32,7 @@
     "com.twitter.heron.streamlet.impl.operators.JoinOperatorTest",
     "com.twitter.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
     "com.twitter.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
+    "com.twitter.heron.api.ConfigTest",
     "com.twitter.heron.api.HeronSubmitterTest"
   ],
   runtime_deps = [ ":api-tests" ],
diff --git a/heron/api/tests/java/com/twitter/heron/api/ConfigTest.java b/heron/api/tests/java/com/twitter/heron/api/ConfigTest.java
new file mode 100644
index 0000000..9de282a
--- /dev/null
+++ b/heron/api/tests/java/com/twitter/heron/api/ConfigTest.java
@@ -0,0 +1,49 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.api;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class ConfigTest {
+
+  @Test
+  public void testRegisterTimerEvent() {
+    Config conf = new Config();
+    try {
+      Config.registerTopologyTimerEvents(conf, "timer", Duration.ofSeconds(0), () -> {
+      });
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+    }
+
+    try {
+      Config.registerTopologyTimerEvents(conf, "timer", Duration.ofSeconds(-1), () -> {
+      });
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+    }
+
+    try {
+      Config.registerTopologyTimerEvents(conf, "timer", Duration.ofSeconds(1), () -> {
+      });
+    } catch (IllegalArgumentException e) {
+      Assert.fail();
+    }
+  }
+}
diff --git a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
index be8450b..069b6e2 100644
--- a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
@@ -24,6 +24,7 @@
 
 import com.twitter.heron.api.topology.TopologyBuilder;
 import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Resources;
 import com.twitter.heron.streamlet.SerializableTransformer;
 import com.twitter.heron.streamlet.Streamlet;
 import com.twitter.heron.streamlet.WindowConfig;
@@ -256,4 +257,18 @@
         (JoinStreamlet<String, String, String, String>) fStreamlet.getChildren().get(0);
     assertEquals(jStreamlet.getChildren().size(), 0);
   }
+
+  @Test
+  public void testResourcesBuilder() {
+    Resources defaultResoures = Resources.defaultResources();
+    assertEquals(0, Float.compare(defaultResoures.getCpu(), 1.0f));
+    assertEquals(defaultResoures.getRam(), 104857600);
+
+    Resources res2 = new Resources.Builder()
+        .setCpu(5.1f)
+        .setRamInGB(20)
+        .build();
+    assertEquals(0, Float.compare(res2.getCpu(), 5.1f));
+    assertEquals(res2.getRam(), 20 * 1024 * 1024);
+  }
 }
diff --git a/heron/ckptmgr/src/java/BUILD b/heron/ckptmgr/src/java/BUILD
index 0aa6fda..95e9ba0 100644
--- a/heron/ckptmgr/src/java/BUILD
+++ b/heron/ckptmgr/src/java/BUILD
@@ -28,7 +28,7 @@
     deps = [
         ":ckptmgr-java",
         "//heron/spi/src/java:statefulstorage-spi-java",
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:network-java",
diff --git a/heron/common/src/java/BUILD b/heron/common/src/java/BUILD
index 118a609..c5e721a 100644
--- a/heron/common/src/java/BUILD
+++ b/heron/common/src/java/BUILD
@@ -46,7 +46,7 @@
     deps = heron_java_proto_files() + [
         ":basics-java",
         ":config-java",
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/api/src/java:classification",
     ]
 )
diff --git a/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java b/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java
index 0870084..8744990 100644
--- a/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java
+++ b/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java
@@ -119,6 +119,16 @@
     timers.add(new TimerTask(expiration, task));
   }
 
+  public void registerPeriodicEvent(Duration frequency, Runnable task) {
+    registerTimerEvent(frequency, new Runnable() {
+      @Override
+      public void run() {
+        task.run();
+        registerPeriodicEvent(frequency, task);
+      }
+    });
+  }
+
   public void exitLoop() {
     exitLoop = true;
     wakeUp();
diff --git a/heron/common/tests/java/BUILD b/heron/common/tests/java/BUILD
index f4bc5d5..6ab0757 100644
--- a/heron/common/tests/java/BUILD
+++ b/heron/common/tests/java/BUILD
@@ -3,7 +3,7 @@
     srcs = glob(["**/*.java"]),
     deps = [
         "//heron/proto:proto_topology_java",
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:network-java",
diff --git a/heron/config/src/yaml/conf/aurora/heron.aurora b/heron/config/src/yaml/conf/aurora/heron.aurora
index 2136c2f..8ab3df4 100644
--- a/heron/config/src/yaml/conf/aurora/heron.aurora
+++ b/heron/config/src/yaml/conf/aurora/heron.aurora
@@ -22,48 +22,48 @@
 
 command_to_start_executor = \
   '{{EXECUTOR_BINARY}}' \
-  '--shard={{mesos.instance}}' \
-  '--topology-name={{TOPOLOGY_NAME}}' \
-  '--topology-id={{TOPOLOGY_ID}}' \
-  '--topology-defn-file={{TOPOLOGY_DEFINITION_FILE}}' \
-  '--state-manager-connection={{STATEMGR_CONNECTION_STRING}}' \
-  '--state-manager-root={{STATEMGR_ROOT_PATH}}' \
-  '--tmaster-binary={{TMASTER_BINARY}}' \
-  '--stmgr-binary={{STMGR_BINARY}}' \
-  '--metrics-manager-classpath="{{METRICSMGR_CLASSPATH}}"' \
-  '--instance-jvm-opts={{INSTANCE_JVM_OPTS_IN_BASE64}}' \
-  '--classpath="{{TOPOLOGY_CLASSPATH}}"' \
-  '--master-port={{thermos.ports[port1]}}' \
-  '--tmaster-controller-port={{thermos.ports[port2]}}' \
-  '--tmaster-stats-port={{thermos.ports[port3]}}' \
-  '--heron-internals-config-file={{SYSTEM_YAML}}' \
-  '--override-config-file={{OVERRIDE_YAML}} ' \
-  '--component-ram-map={{COMPONENT_RAMMAP}}' \
-  '--component-jvm-opts={{COMPONENT_JVM_OPTS_IN_BASE64}}' \
-  '--pkg-type={{TOPOLOGY_PACKAGE_TYPE}}' \
-  '--topology-binary-file={{TOPOLOGY_BINARY_FILE}}' \
-  '--heron-java-home={{JAVA_HOME}}' \
-  '--shell-port={{thermos.ports[http]}}' \
-  '--heron-shell-binary={{SHELL_BINARY}}' \
-  '--metrics-manager-port={{thermos.ports[port4]}}' \
-  '--cluster={{CLUSTER}}' \
-  '--role={{ROLE}}' \
-  '--environment={{ENVIRON}}' \
-  '--instance-classpath="{{INSTANCE_CLASSPATH}}"' \
-  '--metrics-sinks-config-file={{METRICS_YAML}}' \
-  '--scheduler-classpath="{{SCHEDULER_CLASSPATH}}"' \
-  '--scheduler-port="{{thermos.ports[scheduler]}}"' \
-  '--python-instance-binary={{PYTHON_INSTANCE_BINARY}}' \
-  '--cpp-instance-binary={{CPP_INSTANCE_BINARY}}' \
-  '--metricscache-manager-classpath={{METRICSCACHEMGR_CLASSPATH}}' \
-  '--metricscache-manager-master-port={{thermos.ports[metricscachemgr_masterport]}}' \
-  '--metricscache-manager-stats-port={{thermos.ports[metricscachemgr_statsport]}}' \
-  '--is-stateful={{IS_STATEFUL_ENABLED}}' \
-  '--checkpoint-manager-classpath="{{CKPTMGR_CLASSPATH}}"' \
-  '--checkpoint-manager-port={{thermos.ports[ckptmgr_port]}}' \
-  '--stateful-config-file={{STATEFUL_CONFIG_YAML}}' \
-  '--health-manager-mode={{HEALTHMGR_MODE}}' \
-  '--health-manager-classpath={{HEALTHMGR_CLASSPATH}}'
+  ' --shard={{mesos.instance}}' \
+  ' --topology-name={{TOPOLOGY_NAME}}' \
+  ' --topology-id={{TOPOLOGY_ID}}' \
+  ' --topology-defn-file={{TOPOLOGY_DEFINITION_FILE}}' \
+  ' --state-manager-connection={{STATEMGR_CONNECTION_STRING}}' \
+  ' --state-manager-root={{STATEMGR_ROOT_PATH}}' \
+  ' --tmaster-binary={{TMASTER_BINARY}}' \
+  ' --stmgr-binary={{STMGR_BINARY}}' \
+  ' --metrics-manager-classpath="{{METRICSMGR_CLASSPATH}}"' \
+  ' --instance-jvm-opts={{INSTANCE_JVM_OPTS_IN_BASE64}}' \
+  ' --classpath="{{TOPOLOGY_CLASSPATH}}"' \
+  ' --master-port={{thermos.ports[port1]}}' \
+  ' --tmaster-controller-port={{thermos.ports[port2]}}' \
+  ' --tmaster-stats-port={{thermos.ports[port3]}}' \
+  ' --heron-internals-config-file={{SYSTEM_YAML}}' \
+  ' --override-config-file={{OVERRIDE_YAML}} ' \
+  ' --component-ram-map={{COMPONENT_RAMMAP}}' \
+  ' --component-jvm-opts={{COMPONENT_JVM_OPTS_IN_BASE64}}' \
+  ' --pkg-type={{TOPOLOGY_PACKAGE_TYPE}}' \
+  ' --topology-binary-file={{TOPOLOGY_BINARY_FILE}}' \
+  ' --heron-java-home={{JAVA_HOME}}' \
+  ' --shell-port={{thermos.ports[http]}}' \
+  ' --heron-shell-binary={{SHELL_BINARY}}' \
+  ' --metrics-manager-port={{thermos.ports[port4]}}' \
+  ' --cluster={{CLUSTER}}' \
+  ' --role={{ROLE}}' \
+  ' --environment={{ENVIRON}}' \
+  ' --instance-classpath="{{INSTANCE_CLASSPATH}}"' \
+  ' --metrics-sinks-config-file={{METRICS_YAML}}' \
+  ' --scheduler-classpath="{{SCHEDULER_CLASSPATH}}"' \
+  ' --scheduler-port="{{thermos.ports[scheduler]}}"' \
+  ' --python-instance-binary={{PYTHON_INSTANCE_BINARY}}' \
+  ' --cpp-instance-binary={{CPP_INSTANCE_BINARY}}' \
+  ' --metricscache-manager-classpath={{METRICSCACHEMGR_CLASSPATH}}' \
+  ' --metricscache-manager-master-port={{thermos.ports[metricscachemgr_masterport]}}' \
+  ' --metricscache-manager-stats-port={{thermos.ports[metricscachemgr_statsport]}}' \
+  ' --is-stateful={{IS_STATEFUL_ENABLED}}' \
+  ' --checkpoint-manager-classpath="{{CKPTMGR_CLASSPATH}}"' \
+  ' --checkpoint-manager-port={{thermos.ports[ckptmgr_port]}}' \
+  ' --stateful-config-file={{STATEFUL_CONFIG_YAML}}' \
+  ' --health-manager-mode={{HEALTHMGR_MODE}}' \
+  ' --health-manager-classpath={{HEALTHMGR_CLASSPATH}}'
 
 launch_heron_executor = Process(
   name = 'launch_heron_executor',
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index d901d20..5a2d0b0 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -43,6 +43,8 @@
 
 Log = log.Log
 
+# pylint: disable=too-many-lines
+
 def print_usage():
   print(
       "Usage: ./heron-executor <shardid> <topname> <topid> <topdefnfile>"
@@ -55,7 +57,7 @@
       " <scheduler_classpath> <scheduler_port> <python_instance_binary>"
       " <metricscachemgr_classpath> <metricscachemgr_masterport> <metricscachemgr_statsport>"
       " <is_stateful> <ckptmgr_classpath> <ckptmgr_port> <stateful_config_file> "
-      " <healthmgr_mode> <healthmgr_classpath> <cpp_instance_binary>")
+      " <healthmgr_mode> <healthmgr_classpath> <cpp_instance_binary> <jvm_remote_debugger_ports>")
 
 def id_map(prefix, container_plans, add_zero_id=False):
   ids = {}
@@ -219,7 +221,9 @@
     self.health_manager_mode = parsed_args.health_manager_mode
     self.health_manager_classpath = '%s:%s'\
         % (self.scheduler_classpath, parsed_args.health_manager_classpath)
-
+    self.jvm_remote_debugger_ports = \
+      parsed_args.jvm_remote_debugger_ports.split(",") \
+        if parsed_args.jvm_remote_debugger_ports else None
 
   def __init__(self, args, shell_env):
     self.init_parsed_args(args)
@@ -293,6 +297,8 @@
     parser.add_argument("--stateful-config-file", required=True)
     parser.add_argument("--health-manager-mode", required=True)
     parser.add_argument("--health-manager-classpath", required=True)
+    parser.add_argument("--jvm-remote-debugger-ports", required=False,
+                        help="ports to be used by a remote debugger for JVM instances")
 
     parsed_args, unknown_args = parser.parse_known_args(args[1:])
 
@@ -508,6 +514,10 @@
             java_version.startswith("1.5"):
       java_metasize_param = 'PermSize'
 
+    if self.jvm_remote_debugger_ports and \
+            (len(instance_info) > len(self.jvm_remote_debugger_ports)):
+      Log.warn("Not enough remote debugger ports for all instances!")
+
     for (instance_id, component_name, global_task_id, component_index) in instance_info:
       total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
       heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
@@ -542,6 +552,12 @@
                       '-XX:ParallelGCThreads=4',
                       '-Xloggc:log-files/gc.%s.log' % instance_id]
 
+      remote_debugger_port = None
+      if self.jvm_remote_debugger_ports:
+        remote_debugger_port = self.jvm_remote_debugger_ports.pop()
+        instance_cmd.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
+                            % remote_debugger_port)
+
       instance_args = ['-topology_name', self.topology_name,
                        '-topology_id', self.topology_id,
                        '-instance_id', instance_id,
@@ -553,10 +569,13 @@
                        '-metricsmgr_port', self.metrics_manager_port,
                        '-system_config_file', self.heron_internals_config_file,
                        '-override_config_file', self.override_config_file]
+      if remote_debugger_port:
+        instance_args += ['-remote_debugger_port', remote_debugger_port]
 
       instance_cmd = instance_cmd + self.instance_jvm_opts.split()
       if component_name in self.component_jvm_opts:
         instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
+
       instance_cmd.extend(['-Djava.net.preferIPv4Stack=true',
                            '-cp',
                            '%s:%s' % (self.instance_classpath, self.classpath),
diff --git a/heron/healthmgr/src/java/BUILD b/heron/healthmgr/src/java/BUILD
index c232d7b..010a4bb 100644
--- a/heron/healthmgr/src/java/BUILD
+++ b/heron/healthmgr/src/java/BUILD
@@ -5,7 +5,7 @@
 load("/tools/rules/heron_deps", "heron_java_proto_files")
 
 healthmgr_deps_files = [
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
     "//heron/api/src/java:classification",
     "//heron/common/src/java:basics-java",
     "//heron/common/src/java:config-java",
diff --git a/heron/instance/src/java/BUILD b/heron/instance/src/java/BUILD
index b465f20..56e2f8d 100644
--- a/heron/instance/src/java/BUILD
+++ b/heron/instance/src/java/BUILD
@@ -7,7 +7,7 @@
 
 instance_deps_files =  \
     heron_java_proto_files() + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/api/src/java:classification",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
diff --git a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
index b8b5be6..7dc3aa0 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
@@ -201,15 +201,13 @@
     stmgrPortOption.setRequired(true);
     options.addOption(stmgrPortOption);
 
-    Option metricsmgrPortOption
-        = new Option(
+    Option metricsmgrPortOption = new Option(
         CommandLineOptions.METRICS_MGR_PORT_OPTION, true, "Metrics Manager Port");
     metricsmgrPortOption.setType(Integer.class);
     metricsmgrPortOption.setRequired(true);
     options.addOption(metricsmgrPortOption);
 
-    Option systemConfigFileOption
-        = new Option(
+    Option systemConfigFileOption = new Option(
         CommandLineOptions.SYSTEM_CONFIG_FILE, true, "Heron Internals Config Filename");
     systemConfigFileOption.setType(String.class);
     systemConfigFileOption.setRequired(true);
@@ -222,6 +220,11 @@
     overrideConfigFileOption.setRequired(true);
     options.addOption(overrideConfigFileOption);
 
+    Option remoteDebuggerPortOption = new Option(
+        CommandLineOptions.REMOTE_DEBUGGER_PORT, true, "Remote Debugger Port");
+    remoteDebuggerPortOption.setType(Integer.class);
+    options.addOption(remoteDebuggerPortOption);
+
     CommandLineParser parser = new DefaultParser();
     HelpFormatter formatter = new HelpFormatter();
     CommandLine cmd = null;
@@ -257,6 +260,12 @@
     String overrideConfigFile
         = commandLine.getOptionValue(CommandLineOptions.OVERRIDE_CONFIG_FILE);
 
+    Integer remoteDebuggerPort = null;
+    if (commandLine.hasOption(CommandLineOptions.REMOTE_DEBUGGER_PORT)) {
+      remoteDebuggerPort = Integer.parseInt(
+          commandLine.getOptionValue(CommandLineOptions.REMOTE_DEBUGGER_PORT));
+    }
+
     SystemConfig systemConfig = SystemConfig.newBuilder(true)
         .putAll(systemConfigFile, true)
         .putAll(overrideConfigFile, true)
@@ -266,8 +275,13 @@
     SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
 
     // Create the protobuf Instance
-    PhysicalPlans.InstanceInfo instanceInfo = PhysicalPlans.InstanceInfo.newBuilder().
-        setTaskId(taskId).setComponentIndex(componentIndex).setComponentName(componentName).build();
+    PhysicalPlans.InstanceInfo.Builder instanceInfoBuilder
+        = PhysicalPlans.InstanceInfo.newBuilder().setTaskId(taskId)
+        .setComponentIndex(componentIndex).setComponentName(componentName);
+    if (remoteDebuggerPort != null) {
+      instanceInfoBuilder.setRemoteDebuggerPort(remoteDebuggerPort);
+    }
+    PhysicalPlans.InstanceInfo instanceInfo = instanceInfoBuilder.build();
 
     PhysicalPlans.Instance instance = PhysicalPlans.Instance.newBuilder().
         setInstanceId(instanceId).setStmgrId(streamId).setInfo(instanceInfo).build();
@@ -285,11 +299,17 @@
             systemConfig.getHeronLoggingMaximumFiles()));
     LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());
 
-    LOG.info("\nStarting instance " + instanceId + " for topology " + topologyName
+    String logMsg = "\nStarting instance " + instanceId + " for topology " + topologyName
         + " and topologyId " + topologyId + " for component " + componentName
         + " with taskId " + taskId + " and componentIndex " + componentIndex
         + " and stmgrId " + streamId + " and stmgrPort " + streamPort
-        + " and metricsManagerPort " + metricsPort);
+        + " and metricsManagerPort " + metricsPort;
+
+    if (remoteDebuggerPort != null) {
+      logMsg += " and remoteDebuggerPort " + remoteDebuggerPort;
+    }
+
+    LOG.info(logMsg);
 
     LOG.info("System Config: " + systemConfig);
 
diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
index 3c73e4d..415e460 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
@@ -47,6 +47,7 @@
 import com.twitter.heron.common.utils.tuple.TickTuple;
 import com.twitter.heron.common.utils.tuple.TupleImpl;
 import com.twitter.heron.instance.IInstance;
+import com.twitter.heron.instance.util.InstanceUtils;
 import com.twitter.heron.proto.ckptmgr.CheckpointManager;
 import com.twitter.heron.proto.system.HeronTuples;
 
@@ -223,6 +224,7 @@
     looper.addTasksOnWakeup(boltTasks);
 
     PrepareTickTupleTimer();
+    InstanceUtils.prepareTimerEvents(looper, helper);
   }
 
   @Override
@@ -305,13 +307,9 @@
     if (tickTupleFreqMs != null) {
       Duration freq = TypeUtils.getDuration(tickTupleFreqMs, ChronoUnit.MILLIS);
 
-      Runnable r = new Runnable() {
-        public void run() {
-          SendTickTuple();
-        }
-      };
+      Runnable r = () -> SendTickTuple();
 
-      looper.registerTimerEvent(freq, r);
+      looper.registerPeriodicEvent(freq, r);
     }
   }
 
@@ -323,7 +321,5 @@
     boltMetrics.executeTuple(t.getSourceStreamId(), t.getSourceComponent(), latency);
 
     collector.sendOutTuples();
-    // reschedule ourselves again
-    PrepareTickTupleTimer();
   }
 }
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 7090dbf..7e99117 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -44,6 +44,7 @@
 import com.twitter.heron.common.utils.misc.SerializeDeSerializeHelper;
 import com.twitter.heron.common.utils.topology.TopologyContextImpl;
 import com.twitter.heron.instance.IInstance;
+import com.twitter.heron.instance.util.InstanceUtils;
 import com.twitter.heron.proto.ckptmgr.CheckpointManager;
 import com.twitter.heron.proto.system.HeronTuples;
 
@@ -264,6 +265,8 @@
     if (enableMessageTimeouts) {
       lookForTimeouts();
     }
+
+    InstanceUtils.prepareTimerEvents(looper, helper);
   }
 
   /**
diff --git a/heron/instance/src/java/com/twitter/heron/instance/util/InstanceUtils.java b/heron/instance/src/java/com/twitter/heron/instance/util/InstanceUtils.java
new file mode 100644
index 0000000..1bd7a8d
--- /dev/null
+++ b/heron/instance/src/java/com/twitter/heron/instance/util/InstanceUtils.java
@@ -0,0 +1,43 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.instance.util;
+
+import java.time.Duration;
+import java.util.Map;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.Pair;
+import com.twitter.heron.common.basics.SlaveLooper;
+import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
+
+public final class InstanceUtils {
+  private InstanceUtils() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void prepareTimerEvents(SlaveLooper looper, PhysicalPlanHelper helper) {
+    Map<String, Pair<Duration, Runnable>> timerEvents =
+        (Map<String, Pair<Duration, Runnable>>) helper.getTopologyContext()
+            .getTopologyConfig().get(Config.TOPOLOGY_TIMER_EVENTS);
+
+    if (timerEvents != null) {
+      for (Map.Entry<String, Pair<Duration, Runnable>> entry : timerEvents.entrySet()) {
+        Duration duration = entry.getValue().getFirst();
+        Runnable task = entry.getValue().getSecond();
+
+        looper.registerPeriodicEvent(duration, task);
+      }
+    }
+  }
+}
diff --git a/heron/instance/src/java/shade.conf b/heron/instance/src/java/shade.conf
index fb7700c..3a45c40 100644
--- a/heron/instance/src/java/shade.conf
+++ b/heron/instance/src/java/shade.conf
@@ -1,4 +1,3 @@
 rule com.google.protobuf** com.twitter.heron.shaded.@0
 rule org.yaml.snakeyaml** com.twitter.heron.shaded.@0
-rule com.esotericsoftware.kryo** com.twitter.heron.shaded.@0
 rule org.apache.commons** com.twitter.heron.shaded.@0
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index e0e663c..e504bcd 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -2,7 +2,7 @@
 
 test_deps_files = \
     heron_java_proto_files() + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:network-java",
diff --git a/heron/metricscachemgr/src/java/BUILD b/heron/metricscachemgr/src/java/BUILD
index 91a0d29..553fd5e 100644
--- a/heron/metricscachemgr/src/java/BUILD
+++ b/heron/metricscachemgr/src/java/BUILD
@@ -10,7 +10,7 @@
            "//heron/spi/src/java:utils-spi-java",
            "//heron/spi/src/java:statemgr-spi-java",
            "//heron/spi/src/java:packing-spi-java",
-           "//heron/api/src/java:api-java",
+           "//heron/api/src/java:api-java-low-level",
            "//heron/common/src/java:basics-java",
            "//heron/common/src/java:config-java",
            "//heron/common/src/java:network-java",
diff --git a/heron/metricsmgr/src/java/BUILD b/heron/metricsmgr/src/java/BUILD
index bdff429..3e67d62 100644
--- a/heron/metricsmgr/src/java/BUILD
+++ b/heron/metricsmgr/src/java/BUILD
@@ -9,7 +9,7 @@
         exclude = ["**/MetricManager.java"],
     ),
     deps = [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:network-java",
@@ -33,7 +33,7 @@
     srcs = glob(["**/MetricsManager.java"]),
     deps = [
         ":metricsmgr-java",
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:network-java",
diff --git a/heron/metricsmgr/tests/java/BUILD b/heron/metricsmgr/tests/java/BUILD
index af6e540..80ecc2d 100644
--- a/heron/metricsmgr/tests/java/BUILD
+++ b/heron/metricsmgr/tests/java/BUILD
@@ -2,7 +2,7 @@
     name = "metricsmgr-tests",
     srcs = glob(["**/*.java"]),
     deps = [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:network-java",
         "//heron/common/src/java:config-java",
diff --git a/heron/packing/src/java/BUILD b/heron/packing/src/java/BUILD
index d8df225..2b97922 100644
--- a/heron/packing/src/java/BUILD
+++ b/heron/packing/src/java/BUILD
@@ -15,7 +15,7 @@
 roundrobin_deps_files = \
     heron_java_proto_files() + \
     packing_deps_files + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/spi/src/java:statemgr-spi-java",
         "//heron/spi/src/java:utils-spi-java",
     ]
@@ -23,7 +23,7 @@
 binpacking_deps_files = \
     heron_java_proto_files() + \
     packing_deps_files + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/spi/src/java:statemgr-spi-java",
         "//heron/spi/src/java:utils-spi-java",
     ]
@@ -37,7 +37,7 @@
         "//heron/spi/src/java:common-spi-java",
         "//heron/spi/src/java:packing-spi-java",
         "//heron/spi/src/java:utils-spi-java",
-        "//heron/api/src/java:api-java"
+        "//heron/api/src/java:api-java-low-level"
     ],
 )
 
diff --git a/heron/packing/tests/java/BUILD b/heron/packing/tests/java/BUILD
index ad2851a..ec74b63 100644
--- a/heron/packing/tests/java/BUILD
+++ b/heron/packing/tests/java/BUILD
@@ -20,7 +20,7 @@
     heron_java_proto_files() + \
     packing_deps_files + \
     test_deps_files + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/spi/src/java:utils-spi-java",
     ]
 
@@ -28,7 +28,7 @@
     heron_java_proto_files() + \
     packing_deps_files + \
     test_deps_files + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/spi/src/java:utils-spi-java",
     ]
 
@@ -36,7 +36,7 @@
      heron_java_proto_files() + \
         packing_deps_files + \
         test_deps_files + [
-            "//heron/api/src/java:api-java",
+            "//heron/api/src/java:api-java-low-level",
             "//heron/spi/src/java:utils-spi-java",
         ]
 
@@ -54,7 +54,7 @@
         "//heron/packing/src/java:utils",
         "//heron/spi/src/java:packing-spi-java",
         "//third_party/java:junit4",
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/proto:proto_topology_java",
         "//heron/spi/src/java:common-spi-java",
         "//heron/spi/src/java:utils-spi-java",
diff --git a/heron/proto/physical_plan.proto b/heron/proto/physical_plan.proto
index cc72fe4..e11af00 100644
--- a/heron/proto/physical_plan.proto
+++ b/heron/proto/physical_plan.proto
@@ -33,6 +33,8 @@
   required int32 component_index = 2; // specific to this component
   required string component_name = 3;
   repeated string params = 4;
+  // the port a remote debugger can be attached to for this instance.  Currently JVM instances only
+  optional int32 remote_debugger_port = 5;
 }
 
 message Instance {
diff --git a/heron/scheduler-core/src/java/BUILD b/heron/scheduler-core/src/java/BUILD
index 4176368..c8b7545 100644
--- a/heron/scheduler-core/src/java/BUILD
+++ b/heron/scheduler-core/src/java/BUILD
@@ -9,10 +9,11 @@
    "//heron/api/src/java:classification",
    "@commons_cli_commons_cli//jar",
    "@com_google_guava_guava//jar",
+   "@org_apache_commons_commons_lang3//jar",
 ]
 
 spi_deps_files = [
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
     "//heron/spi/src/java:common-spi-java",
     "//heron/spi/src/java:statemgr-spi-java",
     "//heron/spi/src/java:uploader-spi-java",
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
index 0d5f416..0f7b890 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
@@ -56,7 +56,8 @@
   CheckpointManagerPort("checkpoint-manager-port"),
   StatefulConfigFile("stateful-config-file"),
   HealthManagerMode("health-manager-mode"),
-  HealthManagerClasspath("health-manager-classpath");
+  HealthManagerClasspath("health-manager-classpath"),
+  JvmRemoteDebuggerPorts("jvm-remote-debugger-ports");
 
   private final String name;
 
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
index 8838f7d..358604a 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
@@ -17,8 +17,10 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -39,12 +41,63 @@
 import com.twitter.heron.spi.utils.ShellUtils;
 
 public final class SchedulerUtils {
-  public static final int PORTS_REQUIRED_FOR_EXECUTOR = 9;
   public static final int PORTS_REQUIRED_FOR_SCHEDULER = 1;
   public static final String SCHEDULER_COMMAND_LINE_PROPERTIES_OVERRIDE_OPTION = "P";
 
   private static final Logger LOG = Logger.getLogger(SchedulerUtils.class.getName());
 
+  /**
+   * Enum that defines the type of ports that an heron executor needs
+   */
+
+  public enum ExecutorPort {
+    MASTER_PORT("master", true),
+    TMASTER_CONTROLLER_PORT("tmaster-ctl", true),
+    TMASTER_STATS_PORT("tmaster-stats", true),
+    SHELL_PORT("shell-port", true),
+    METRICS_MANAGER_PORT("metrics-mgr", true),
+    SCHEDULER_PORT("scheduler", true),
+    METRICS_CACHE_MASTER_PORT("metrics-cache-m", true),
+    METRICS_CACHE_STATS_PORT("metrics-cache-s", true),
+    CHECKPOINT_MANAGER_PORT("ckptmgr", true),
+    JVM_REMOTE_DEBUGGER_PORTS("jvm-remote-debugger", false);
+
+    private final String name;
+    private final boolean required;
+
+    ExecutorPort(String name, boolean required) {
+      this.name = name;
+      this.required = required;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public boolean isRequired() {
+      return required;
+    }
+
+    public static String getPort(ExecutorPort executorPort,
+                                  Map<ExecutorPort, String> portMap) {
+      if (!portMap.containsKey(executorPort) && executorPort.isRequired()) {
+        throw new RuntimeException("Required port " + executorPort.getName() + " not provided");
+      }
+
+      return portMap.get(executorPort);
+    }
+
+    public static Set<ExecutorPort> getRequiredPorts() {
+      Set<ExecutorPort> executorPorts = new HashSet<>();
+      for (ExecutorPort executorPort : ExecutorPort.values()) {
+        if (executorPort.isRequired()) {
+          executorPorts.add(executorPort);
+        }
+      }
+      return executorPorts;
+    }
+  }
+
   private SchedulerUtils() {
   }
 
@@ -124,50 +177,18 @@
   /**
    * Utils method to construct the command to start heron-executor
    *
-   * @param config The static Config
-   * @param runtime The runtime Config
-   * @param containerIndex the executor/container index
-   * @param freePorts list of free ports
-   * @return String[] representing the command to start heron-executor
-   */
-  public static String[] executorCommand(
-      Config config,
-      Config runtime,
-      int containerIndex,
-      List<Integer> freePorts) {
-    // First let us have some safe checks
-    if (freePorts.size() < PORTS_REQUIRED_FOR_EXECUTOR) {
-      throw new RuntimeException("Failed to find enough ports for executor");
-    }
-    for (int port : freePorts) {
-      if (port == -1) {
-        throw new RuntimeException("Failed to find available ports for executor");
-      }
-    }
-
-    // Convert port to string
-    List<String> ports = new LinkedList<>();
-    for (int port : freePorts) {
-      ports.add(Integer.toString(port));
-    }
-
-    return getExecutorCommand(config, runtime, containerIndex, ports);
-  }
-
-  /**
-   * Utils method to construct the command to start heron-executor
-   *
    * @param config The static config
    * @param runtime The runtime config
    * @param containerIndex the executor/container index
-   * @param ports list of free ports in String
+   * @param ports a map of ports to use where the key indicate the port type and the
+   * value is the port
    * @return String[] representing the command to start heron-executor
    */
   public static String[] getExecutorCommand(
       Config config,
       Config runtime,
       int containerIndex,
-      List<String> ports) {
+      Map<ExecutorPort, String> ports) {
     List<String> commands = new ArrayList<>();
     commands.add(Context.executorBinary(config));
     commands.add(createCommandArg(ExecutorFlag.Shard, Integer.toString(containerIndex)));
@@ -184,22 +205,35 @@
    *
    * @param config The static Config
    * @param runtime The runtime Config
-   * @param freePorts list of free ports
+   * @param ports a map of ports to use where the key indicate the port type and the
+   * value is the port
    * @return String[] representing the arguments to start heron-executor
    */
   public static String[] executorCommandArgs(
-      Config config, Config runtime, List<String> freePorts) {
+      Config config, Config runtime, Map<ExecutorPort, String> ports) {
     TopologyAPI.Topology topology = Runtime.topology(runtime);
 
-    String masterPort = freePorts.get(0);
-    String tmasterControllerPort = freePorts.get(1);
-    String tmasterStatsPort = freePorts.get(2);
-    String shellPort = freePorts.get(3);
-    String metricsmgrPort = freePorts.get(4);
-    String schedulerPort = freePorts.get(5);
-    String metricsCacheMasterPort = freePorts.get(6);
-    String metricsCacheStatsPort = freePorts.get(7);
-    String ckptmgrPort = freePorts.get(8);
+    String masterPort = ExecutorPort.getPort(
+        ExecutorPort.MASTER_PORT, ports);
+    String tmasterControllerPort = ExecutorPort.getPort(
+        ExecutorPort.TMASTER_CONTROLLER_PORT, ports);
+    String tmasterStatsPort = ExecutorPort.getPort(
+        ExecutorPort.TMASTER_STATS_PORT, ports);
+    String shellPort = ExecutorPort.getPort(
+        ExecutorPort.SHELL_PORT, ports);
+    String metricsmgrPort = ExecutorPort.getPort(
+        ExecutorPort.METRICS_MANAGER_PORT, ports);
+    String schedulerPort = ExecutorPort.getPort(
+        ExecutorPort.SCHEDULER_PORT, ports);
+    String metricsCacheMasterPort =  ExecutorPort.getPort(
+        ExecutorPort.METRICS_CACHE_MASTER_PORT, ports);
+    String metricsCacheStatsPort = ExecutorPort.getPort(
+        ExecutorPort.METRICS_CACHE_STATS_PORT, ports);
+    String ckptmgrPort = ExecutorPort.getPort(
+        ExecutorPort.CHECKPOINT_MANAGER_PORT, ports);
+    String remoteDebuggerPorts = ExecutorPort.getPort(
+        ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, ports
+    );
 
     List<String> commands = new ArrayList<>();
     commands.add(createCommandArg(ExecutorFlag.TopologyName, topology.getName()));
@@ -280,6 +314,9 @@
     commands.add(createCommandArg(ExecutorFlag.HealthManagerMode, healthMgrMode));
     commands.add(createCommandArg(ExecutorFlag.HealthManagerClasspath,
         Context.healthMgrClassPath(config)));
+    if (remoteDebuggerPorts != null) {
+      commands.add(createCommandArg(ExecutorFlag.JvmRemoteDebuggerPorts, remoteDebuggerPorts));
+    }
 
     return commands.toArray(new String[commands.size()]);
   }
diff --git a/heron/scheduler-core/tests/java/BUILD b/heron/scheduler-core/tests/java/BUILD
index d32a823..b2ebb74 100644
--- a/heron/scheduler-core/tests/java/BUILD
+++ b/heron/scheduler-core/tests/java/BUILD
@@ -4,7 +4,7 @@
     "@com_google_guava_guava//jar",
     "@commons_io_commons_io//jar",
     "//third_party/java:powermock",
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
     "//heron/common/src/java:basics-java",
     "//heron/common/src/java:utils-java",
     "//heron/scheduler-core/src/java:scheduler-java",
diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index 0236024..d81b523 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -18,7 +18,7 @@
 ]
 
 api_deps_files = [
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
 ]
 
 scheduler_deps_files = \
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
index 9e4ccaf..daf28c1 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
@@ -69,12 +69,22 @@
     return cliController.killJob();
   }
 
+  private StMgr searchContainer(Integer id) {
+    String prefix = "stmgr-" + id;
+    for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) {
+      if (sm.getId().equals(prefix)) {
+        return sm;
+      }
+    }
+    return null;
+  }
+
   // Restart an aurora container
   @Override
   public boolean restart(Integer containerId) {
     // there is no backpressure for container 0, delegate to aurora client
     if (containerId == null || containerId == 0) {
-      cliController.restart(containerId);
+      return cliController.restart(containerId);
     }
 
     if (stateMgrAdaptor == null) {
@@ -82,18 +92,24 @@
       return false;
     }
 
-    int index = containerId - 1; // stmgr container starts from 1
-    StMgr contaienrInfo = stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrs(index);
-    String host = contaienrInfo.getHostName();
-    int port = contaienrInfo.getShellPort();
-    String url = "http://" + host + ":" + port + "/killexecutor";
+    StMgr sm = searchContainer(containerId);
+    if (sm == null) {
+      LOG.warning("container not found in pplan " + containerId);
+      return false;
+    }
+
+    String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor";
     String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId();
     LOG.info("sending `kill container` to " + url + "; payload: " + payload);
 
     HttpURLConnection con = NetworkUtils.getHttpConnection(url);
     try {
-      NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes());
-      return NetworkUtils.checkHttpResponseCode(con, 200);
+      if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) {
+        return NetworkUtils.checkHttpResponseCode(con, 200);
+      } else { // if heron-shell command fails, delegate to aurora client
+        LOG.info("heron-shell killexecutor failed; try aurora client ..");
+        return cliController.restart(containerId);
+      }
     } finally {
       con.disconnect();
     }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
index 9c349c6..7799a77 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -16,9 +16,13 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
+
 public final class KubernetesConstants {
   private KubernetesConstants() {
 
@@ -60,10 +64,6 @@
   public static final String ANNOTATION_PROMETHEUS_PORT = "prometheus.io/port";
   public static final String PROMETHEUS_PORT = "8080";
 
-  public static final String[] PORT_NAMES = new String[]{
-      "master", "tmaster-ctlr", "tmaster-stats", "shell", "metricsmgr", "scheduler",
-      "metrics-cache-m", "metrics-cache-s", "ckptmgr"};
-
   public static final String MASTER_PORT = "6001";
   public static final String TMASTER_CONTROLLER_PORT = "6002";
   public static final String TMASTER_STATS_PORT = "6003";
@@ -73,11 +73,22 @@
   public static final String METRICS_CACHE_MASTER_PORT = "6007";
   public static final String METRICS_CACHE_STATS_PORT = "6008";
   public static final String CHECKPOINT_MGR_PORT = "6009";
+  // port number the start with when more than one port needed for remote debugging
+  public static final String JVM_REMOTE_DEBUGGER_PORT = "6010";
+  public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "remote-debugger";
 
-  public static final String[] PORT_LIST = new String[]{
-      MASTER_PORT, TMASTER_CONTROLLER_PORT, TMASTER_STATS_PORT,
-      SHELL_PORT, METRICSMGR_PORT, SCHEDULER_PORT, METRICS_CACHE_MASTER_PORT,
-      METRICS_CACHE_STATS_PORT, CHECKPOINT_MGR_PORT };
+  public static final Map<ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
+  static {
+    EXECUTOR_PORTS.put(ExecutorPort.MASTER_PORT, MASTER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.TMASTER_CONTROLLER_PORT, TMASTER_CONTROLLER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.TMASTER_STATS_PORT, TMASTER_STATS_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.SHELL_PORT, SHELL_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.METRICS_MANAGER_PORT, METRICSMGR_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.SCHEDULER_PORT, SCHEDULER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_MASTER_PORT, METRICS_CACHE_MASTER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_STATS_PORT, METRICS_CACHE_STATS_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.CHECKPOINT_MANAGER_PORT, CHECKPOINT_MGR_PORT);
+  }
 
   public static final String JOB_LINK =
       "/api/v1/proxy/namespaces/kube-system/services/kubernetes-dashboard/#/pod";
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
index ca23d1f..21da118 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -15,14 +15,16 @@
 package com.twitter.heron.scheduler.kubernetes;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,12 +34,14 @@
 import com.google.common.base.Optional;
 import com.google.common.primitives.Ints;
 
+import com.twitter.heron.api.utils.TopologyUtils;
 import com.twitter.heron.common.basics.FileUtils;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.TopologyRuntimeManagementException;
 import com.twitter.heron.scheduler.UpdateTopologyManager;
 import com.twitter.heron.scheduler.utils.Runtime;
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Context;
 import com.twitter.heron.spi.common.Key;
@@ -164,11 +168,17 @@
 
     String[] deploymentConfs =
         new String[Ints.checkedCast(Runtime.numContainers(runtimeConfiguration))];
+
     for (int i = 0; i < Runtime.numContainers(runtimeConfiguration); i++) {
-
-      deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource);
+      Optional<PackingPlan.ContainerPlan> container = packing.getContainer(i);
+      Set<PackingPlan.InstancePlan> instancePlans;
+      if (container.isPresent()) {
+        instancePlans = container.get().getInstances();
+      } else {
+        instancePlans = new HashSet<>();
+      }
+      deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource, instancePlans);
     }
-
     return deploymentConfs;
   }
 
@@ -182,7 +192,8 @@
    */
   protected String buildKubernetesPodSpec(ObjectMapper mapper,
                                           Integer containerIndex,
-                                          Resource containerResource) {
+                                          Resource containerResource,
+                                          Set<PackingPlan.InstancePlan> instancePlans) {
     ObjectNode instance = mapper.createObjectNode();
 
     instance.put(KubernetesConstants.API_VERSION, KubernetesConstants.API_VERSION_1);
@@ -191,7 +202,8 @@
 
     instance.set(KubernetesConstants.API_SPEC, getContainerSpec(mapper,
         containerIndex,
-        containerResource));
+        containerResource,
+        instancePlans));
 
     return instance.toString();
   }
@@ -303,7 +315,7 @@
       setImagePullPolicyIfPresent(containerInfo);
 
       // Port info -- all the same
-      containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+      containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, containerPlan.getInstances()));
 
       // In order for the container to run with the correct index, we're copying the base
       // configuration for container with index 0, and replacing the container index with
@@ -349,7 +361,8 @@
    */
   protected ObjectNode getContainerSpec(ObjectMapper mapper,
                                         int containerIndex,
-                                        Resource containerResource) {
+                                        Resource containerResource,
+                                        Set<PackingPlan.InstancePlan> instancePlans) {
 
     ObjectNode containerSpec = mapper.createObjectNode();
     ArrayNode containerList = mapper.createArrayNode();
@@ -381,10 +394,10 @@
     setImagePullPolicyIfPresent(containerInfo);
 
     // Port information for this container
-    containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+    containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, instancePlans));
 
     // Heron command for the container
-    String[] command = getExecutorCommand(containerIndex);
+    String[] command = getExecutorCommand(containerIndex, instancePlans.size());
     ArrayNode commandsArray = mapper.createArrayNode();
     for (int i = 0; i < command.length; i++) {
       commandsArray.add(command[i]);
@@ -409,22 +422,46 @@
     return containerSpec;
   }
 
+  private List<Integer> getRemoteDebuggerPorts(int numberOfInstances) {
+    List<Integer> ports = new LinkedList<>();
+    for (int i = 0; i < numberOfInstances; i++) {
+      ports.add(Integer.parseInt(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT, 10) + i);
+    }
+
+    return ports;
+  }
+
   /**
    * Get the ports the container will need to expose so other containers can access its services
    *
    * @param mapper
    */
-  protected ArrayNode getPorts(ObjectMapper mapper) {
+  protected ArrayNode getPorts(ObjectMapper mapper,
+                               Set<PackingPlan.InstancePlan> instancePlans) {
     ArrayNode ports = mapper.createArrayNode();
 
-    for (int i = 0; i < KubernetesConstants.PORT_NAMES.length; i++) {
+    for (Map.Entry<ExecutorPort, String> entry
+        : KubernetesConstants.EXECUTOR_PORTS.entrySet()) {
       ObjectNode port = mapper.createObjectNode();
+      ExecutorPort portName = entry.getKey();
       port.put(KubernetesConstants.DOCKER_CONTAINER_PORT,
-          Integer.parseInt(KubernetesConstants.PORT_LIST[i], 10));
-      port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.PORT_NAMES[i]);
+          Integer.parseInt(entry.getValue(), 10));
+      port.put(KubernetesConstants.PORT_NAME, portName.getName());
       ports.add(port);
     }
 
+    // if remote debugger enabled
+    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+      List<Integer> portsForRemoteDebugging = getRemoteDebuggerPorts(instancePlans.size());
+
+      for (int i = 0; i < portsForRemoteDebugging.size(); i++) {
+        ObjectNode port = mapper.createObjectNode();
+        port.put(KubernetesConstants.DOCKER_CONTAINER_PORT, portsForRemoteDebugging.get(i));
+        port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME
+            + "-" + String.valueOf(i));
+      }
+    }
+
     return ports;
   }
 
@@ -449,10 +486,17 @@
    *
    * @param containerIndex
    */
-  protected String[] getExecutorCommand(int containerIndex) {
+  protected String[] getExecutorCommand(int containerIndex, int numInstances) {
+    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+      KubernetesConstants.EXECUTOR_PORTS.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+          String.join(",", getRemoteDebuggerPorts(numInstances)
+              .stream().map(Object::toString)
+              .collect(Collectors.toList())));
+    }
     String[] executorCommand =
         SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
-        containerIndex, Arrays.asList(KubernetesConstants.PORT_LIST));
+        containerIndex, KubernetesConstants.EXECUTOR_PORTS);
+
     String[] command = {
         "sh",
         "-c",
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index beb0c2d..7b598ce 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -32,10 +32,13 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
+import com.twitter.heron.api.utils.TopologyUtils;
 import com.twitter.heron.common.basics.SysUtils;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.UpdateTopologyManager;
+import com.twitter.heron.scheduler.utils.Runtime;
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.scheduler.IScalable;
@@ -79,9 +82,9 @@
    * Start executor process via running an async shell process
    */
   @VisibleForTesting
-  protected Process startExecutorProcess(int container) {
+  protected Process startExecutorProcess(int container, Set<PackingPlan.InstancePlan> instances) {
     return ShellUtils.runASyncProcess(
-        getExecutorCommand(container),
+        getExecutorCommand(container, instances),
         new File(LocalContext.workingDirectory(config)),
         Integer.toString(container));
   }
@@ -90,25 +93,27 @@
    * Start the executor for the given container
    */
   @VisibleForTesting
-  protected void startExecutor(final int container) {
+  protected void startExecutor(final int container, Set<PackingPlan.InstancePlan> instances) {
     LOG.info("Starting a new executor for container: " + container);
 
     // create a process with the executor command and topology working directory
-    final Process containerExecutor = startExecutorProcess(container);
+    final Process containerExecutor = startExecutorProcess(container, instances);
 
     // associate the process and its container id
     processToContainer.put(containerExecutor, container);
     LOG.info("Started the executor for container: " + container);
 
     // add the container for monitoring
-    startExecutorMonitor(container, containerExecutor);
+    startExecutorMonitor(container, containerExecutor, instances);
   }
 
   /**
    * Start the monitor of a given executor
    */
   @VisibleForTesting
-  protected void startExecutorMonitor(final int container, final Process containerExecutor) {
+  protected void startExecutorMonitor(final int container,
+                                      final Process containerExecutor,
+                                      Set<PackingPlan.InstancePlan> instances) {
     // add the container for monitoring
     Runnable r = new Runnable() {
       @Override
@@ -129,7 +134,7 @@
           }
           LOG.log(Level.INFO, "Trying to restart container {0}", container);
           // restart the container
-          startExecutor(processToContainer.remove(containerExecutor));
+          startExecutor(processToContainer.remove(containerExecutor), instances);
         } catch (InterruptedException e) {
           if (!isTopologyKilled) {
             LOG.log(Level.SEVERE, "Process is interrupted: ", e);
@@ -141,14 +146,33 @@
     monitorService.submit(r);
   }
 
-  private String[] getExecutorCommand(int container) {
-    List<Integer> freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
-    for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
-      freePorts.add(SysUtils.getFreePort());
+
+  private String[] getExecutorCommand(int container,  Set<PackingPlan.InstancePlan> instances) {
+    Map<ExecutorPort, String> ports = new HashMap<>();
+    for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
+      int port = SysUtils.getFreePort();
+      if (port == -1) {
+        throw new RuntimeException("Failed to find available ports for executor");
+      }
+      ports.put(executorPort, String.valueOf(port));
     }
 
-    String[] executorCmd = SchedulerUtils.executorCommand(config, runtime, container, freePorts);
+    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtime))
+        && instances != null) {
+      List<String> remoteDebuggingPorts = new LinkedList<>();
+      int portsForRemoteDebugging = instances.size();
+      for (int i = 0; i < portsForRemoteDebugging; i++) {
+        int port = SysUtils.getFreePort();
+        if (port == -1) {
+          throw new RuntimeException("Failed to find available ports for executor");
+        }
+        remoteDebuggingPorts.add(String.valueOf(port));
+      }
+      ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+          String.join(",", remoteDebuggingPorts));
+    }
 
+    String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
     LOG.info("Executor command line: " + Arrays.toString(executorCmd));
     return executorCmd;
   }
@@ -162,11 +186,11 @@
 
     synchronized (processToContainer) {
       LOG.info("Starting executor for TMaster");
-      startExecutor(0);
+      startExecutor(0, null);
 
       // for each container, run its own executor
       for (PackingPlan.ContainerPlan container : packing.getContainers()) {
-        startExecutor(container.getId());
+        startExecutor(container.getId(), container.getInstances());
       }
     }
 
@@ -271,7 +295,7 @@
           throw new RuntimeException(String.format("Found active container for %s, "
               + "cannot launch a duplicate container.", container.getId()));
         }
-        startExecutor(container.getId());
+        startExecutor(container.getId(), container.getInstances());
       }
     }
   }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java
index 07dcc85..acdfe25 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java
@@ -14,6 +14,11 @@
 
 package com.twitter.heron.scheduler.marathon;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
+
 public final class MarathonConstants {
   private MarathonConstants() {
 
@@ -50,10 +55,6 @@
   public static final String DOCKER_FORCE_PULL = "forcePullImage";
   public static final String DOCKER_NETWORK_BRIDGE = "BRIDGE";
 
-  public static final String[] PORT_NAMES = new String[]{
-      "master", "tmaster-controller", "tmaster-stats", "shell", "metricsmgr", "scheduler",
-      "metrics-cache-master", "metrics-cache-stats", "ckptmgr"};
-
   public static final String MASTER_PORT = "$PORT0";
   public static final String TMASTER_CONTROLLER_PORT = "$PORT1";
   public static final String TMASTER_STATS_PORT = "$PORT2";
@@ -64,10 +65,18 @@
   public static final String METRICS_CACHE_STATS_PORT = "$PORT7";
   public static final String CKPTMGR_PORT = "$PORT8";
 
-  public static final String[] PORT_LIST = new String[]{
-      MASTER_PORT, TMASTER_CONTROLLER_PORT, TMASTER_STATS_PORT,
-      SHELL_PORT, METRICSMGR_PORT, SCHEDULER_PORT, METRICS_CACHE_MASTER_PORT,
-      METRICS_CACHE_STATS_PORT, CKPTMGR_PORT};
+  public static final Map<ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
+  static {
+    EXECUTOR_PORTS.put(ExecutorPort.MASTER_PORT, MASTER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.TMASTER_CONTROLLER_PORT, TMASTER_CONTROLLER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.TMASTER_STATS_PORT, TMASTER_STATS_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.SHELL_PORT, SHELL_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.METRICS_MANAGER_PORT, METRICSMGR_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.SCHEDULER_PORT, SCHEDULER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_MASTER_PORT, METRICS_CACHE_MASTER_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_STATS_PORT, METRICS_CACHE_STATS_PORT);
+    EXECUTOR_PORTS.put(ExecutorPort.CHECKPOINT_MANAGER_PORT, CKPTMGR_PORT);
+  }
 
   public static final String JOB_LINK = "/ui/#/group/%2F";
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java
index 067f289..38190c8 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java
@@ -14,9 +14,9 @@
 
 package com.twitter.heron.scheduler.marathon;
 
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Logger;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -27,6 +27,7 @@
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.utils.Runtime;
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Context;
 import com.twitter.heron.spi.common.Key;
@@ -197,12 +198,13 @@
   protected ArrayNode getPorts(ObjectMapper mapper) {
     ArrayNode ports = mapper.createArrayNode();
 
-    for (String portName : MarathonConstants.PORT_NAMES) {
+    for (Map.Entry<ExecutorPort, String> entry
+        : MarathonConstants.EXECUTOR_PORTS.entrySet()) {
       ObjectNode port = mapper.createObjectNode();
       port.put(MarathonConstants.DOCKER_CONTAINER_PORT, 0);
       port.put(MarathonConstants.PROTOCOL, MarathonConstants.TCP);
       port.put(MarathonConstants.HOST_PORT, 0);
-      port.put(MarathonConstants.PORT_NAME, portName);
+      port.put(MarathonConstants.PORT_NAME, entry.getKey().getName());
 
       ports.add(port);
     }
@@ -212,7 +214,7 @@
 
   protected String getExecutorCommand(int containerIndex) {
     String[] commands = SchedulerUtils.getExecutorCommand(config, runtime,
-        containerIndex, Arrays.asList(MarathonConstants.PORT_LIST));
+        containerIndex, MarathonConstants.EXECUTOR_PORTS);
     return "cd $MESOS_SANDBOX && " + Joiner.on(" ").join(commands);
   }
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java
index b5b3b70..6d6298a 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java
@@ -269,6 +269,6 @@
     // Convert them from bytes to MB
     container.diskInMB = maxResourceContainer.getDisk().asMegabytes();
     container.memInMB = maxResourceContainer.getRam().asMegabytes();
-    container.ports = SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR;
+    container.ports = SchedulerUtils.ExecutorPort.getRequiredPorts().size();
   }
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java
index 8649178..324dfb6 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java
@@ -15,6 +15,7 @@
 package com.twitter.heron.scheduler.mesos.framework;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -25,6 +26,7 @@
 import org.apache.mesos.Protos;
 
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.spi.common.Config;
 
 /**
@@ -205,8 +207,19 @@
 
   protected String executorCommand(
       Config config, Config runtime, int containerIndex) {
+    Map<ExecutorPort, String> ports = new HashMap<>();
+    ports.put(ExecutorPort.MASTER_PORT, String.valueOf(freePorts.get(0)));
+    ports.put(ExecutorPort.TMASTER_CONTROLLER_PORT, String.valueOf(freePorts.get(1)));
+    ports.put(ExecutorPort.TMASTER_STATS_PORT, String.valueOf(freePorts.get(2)));
+    ports.put(ExecutorPort.SHELL_PORT, String.valueOf(freePorts.get(3)));
+    ports.put(ExecutorPort.METRICS_MANAGER_PORT, String.valueOf(freePorts.get(4)));
+    ports.put(ExecutorPort.SCHEDULER_PORT, String.valueOf(freePorts.get(5)));
+    ports.put(ExecutorPort.METRICS_CACHE_MASTER_PORT, String.valueOf(freePorts.get(6)));
+    ports.put(ExecutorPort.METRICS_CACHE_STATS_PORT, String.valueOf(freePorts.get(7)));
+    ports.put(ExecutorPort.CHECKPOINT_MANAGER_PORT, String.valueOf(freePorts.get(8)));
+
     String[] executorCmd =
-        SchedulerUtils.executorCommand(config, runtime, containerIndex, freePorts);
+        SchedulerUtils.getExecutorCommand(config, runtime, containerIndex, ports);
     return join(executorCmd, " ");
   }
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java
index 3bc35ad..6f89900 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java
@@ -17,7 +17,9 @@
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -25,6 +27,7 @@
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.utils.Runtime;
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Context;
 import com.twitter.heron.spi.packing.PackingPlan;
@@ -125,12 +128,17 @@
   }
 
   protected String[] getExecutorCommand(PackingPlan packing) {
-    List<String> freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
-    for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
-      freePorts.add(Integer.toString(SysUtils.getFreePort()));
+    Map<ExecutorPort, String> ports = new HashMap<>();
+    for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
+      int port = SysUtils.getFreePort();
+      if (port == -1) {
+        throw new RuntimeException("Failed to find available ports for executor");
+      }
+      ports.put(executorPort, String.valueOf(port));
     }
 
-    String[] executorCmd = SchedulerUtils.executorCommandArgs(this.config, this.runtime, freePorts);
+    String[] executorCmd = SchedulerUtils.executorCommandArgs(this.config, this.runtime,
+        ports);
 
     LOG.log(Level.FINE, "Executor command line: ", Arrays.toString(executorCmd));
     return executorCmd;
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java
index f7d09b4..9321c6f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java
@@ -15,10 +15,9 @@
 package com.twitter.heron.scheduler.yarn;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -37,6 +36,7 @@
 import com.twitter.heron.common.basics.SysUtils;
 import com.twitter.heron.scheduler.utils.SchedulerConfigUtils;
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.scheduler.yarn.HeronConfigurationOptions.Cluster;
 import com.twitter.heron.scheduler.yarn.HeronConfigurationOptions.ComponentRamMap;
 import com.twitter.heron.scheduler.yarn.HeronConfigurationOptions.Environ;
@@ -156,20 +156,24 @@
         verboseMode,
         topology);
 
-    List<Integer> freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
-    for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
-      freePorts.add(SysUtils.getFreePort());
-    }
-
     Config runtime = Config.newBuilder()
         .put(Key.COMPONENT_RAMMAP, componentRamMap)
         .put(Key.TOPOLOGY_DEFINITION, topology)
         .build();
 
-    String[] executorCmd = SchedulerUtils.executorCommand(config,
+    Map<ExecutorPort, String> ports = new HashMap<>();
+    for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
+      int port = SysUtils.getFreePort();
+      if (port == -1) {
+        throw new RuntimeException("Failed to find available ports for executor");
+      }
+      ports.put(executorPort, String.valueOf(port));
+    }
+
+    String[] executorCmd = SchedulerUtils.getExecutorCommand(config,
         runtime,
         heronExecutorId,
-        freePorts);
+        ports);
 
     LOG.info("Executor command line: " + Arrays.toString(executorCmd));
     return executorCmd;
diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD
index 1bf3a14..b54f288 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -3,7 +3,7 @@
 common_deps_files = [
     "@com_google_guava_guava//jar",
     "//third_party/java:powermock",
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
     "//heron/common/src/java:basics-java",
     "//heron/common/src/java:utils-java",
     "//heron/scheduler-core/src/java:scheduler-java",
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
index 1e06d6d..5940d8b 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
@@ -69,7 +69,7 @@
   public static void beforeClass() throws Exception {
     scheduler = Mockito.spy(KubernetesScheduler.class);
     Mockito.doReturn(EXECUTOR_CMD).when(scheduler)
-        .getExecutorCommand(Mockito.anyInt());
+        .getExecutorCommand(Mockito.anyInt(), Mockito.anyInt());
   }
 
   @AfterClass
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
index 484ad8f..badaaa9 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
@@ -25,13 +25,14 @@
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Key;
 import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.utils.PackingTestUtils;
 
-
+@SuppressWarnings("unchecked")
 public class LocalSchedulerTest {
   private static final String TOPOLOGY_NAME = "testTopology";
   private static final int MAX_WAITING_SECOND = 10;
@@ -46,6 +47,19 @@
     Mockito.when(config.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
 
     runtime = Mockito.mock(Config.class);
+
+    scheduler.initialize(config, runtime);
+    Mockito.when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(TopologyAPI.Topology
+        .newBuilder()
+        .setId("a")
+        .setName("a")
+        .setState(TopologyAPI.TopologyState.RUNNING)
+        .setTopologyConfig(
+            TopologyAPI.Config.newBuilder()
+                .addKvs(TopologyAPI.Config.KeyValue.newBuilder()
+                    .setKey(com.twitter.heron.api.Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE)
+                    .setValue("false"))).build());
+
     scheduler.initialize(config, runtime);
   }
 
@@ -71,11 +85,14 @@
   @Test
   public void testOnSchedule() throws Exception {
     Mockito.doNothing().
-        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+        Mockito.anySet());
     Process[] mockProcesses = new Process[4];
     for (int i = 0; i < 4; i++) {
       mockProcesses[i] = Mockito.mock(Process.class);
-      Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i);
+      Set<PackingPlan.InstancePlan> instances
+          = (i == 0) ?  null : PackingTestUtils.testContainerPlan(i).getInstances();
+      Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i, instances);
     }
 
     PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
@@ -92,9 +109,12 @@
         // id 2 was not in the container plan
         continue;
       }
-      Mockito.verify(scheduler).startExecutor(i);
-      Mockito.verify(scheduler).startExecutorProcess(i);
-      Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i]);
+
+      Set<PackingPlan.InstancePlan> instances
+          = (i == 0) ?  null : PackingTestUtils.testContainerPlan(i).getInstances();
+      Mockito.verify(scheduler).startExecutor(i, instances);
+      Mockito.verify(scheduler).startExecutorProcess(i, instances);
+      Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i], instances);
     }
   }
 
@@ -105,12 +125,15 @@
 
     //verify plan is deployed and containers are created
     Mockito.doNothing().
-        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+        Mockito.anySet());
     Process mockProcessTM = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(0);
+    Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(
+        0, null);
 
     Process mockProcessWorker1 = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(1);
+    Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(
+        1,  PackingTestUtils.testContainerPlan(1).getInstances());
 
     PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
     Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
@@ -118,24 +141,30 @@
     Mockito.when(packingPlan.getContainers()).thenReturn(containers);
     Assert.assertTrue(scheduler.onSchedule(packingPlan));
 
-    Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt(),
+        Mockito.anySet());
 
     //now verify add container adds new container
     Process mockProcessWorker2 = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(3);
+    Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(
+        3, PackingTestUtils.testContainerPlan(3).getInstances());
     containers.clear();
     containers.add(PackingTestUtils.testContainerPlan(3));
     scheduler.addContainers(containers);
-    Mockito.verify(scheduler).startExecutor(3);
+    Mockito.verify(scheduler).startExecutor(3,
+        PackingTestUtils.testContainerPlan(3).getInstances());
 
     Process mockProcess = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(Mockito.anyInt());
+    Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(
+        Mockito.anyInt(), Mockito.anySet());
     containers.clear();
     containers.add(PackingTestUtils.testContainerPlan(4));
     containers.add(PackingTestUtils.testContainerPlan(5));
     scheduler.addContainers(containers);
-    Mockito.verify(scheduler).startExecutor(4);
-    Mockito.verify(scheduler).startExecutor(5);
+    Mockito.verify(scheduler).startExecutor(4,
+        PackingTestUtils.testContainerPlan(4).getInstances());
+    Mockito.verify(scheduler).startExecutor(5,
+        PackingTestUtils.testContainerPlan(5).getInstances());
   }
 
   /**
@@ -147,13 +176,17 @@
 
     //verify plan is deployed and containers are created
     Mockito.doNothing().
-        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+        when(scheduler).startExecutorMonitor(Mockito.anyInt(),
+        Mockito.any(Process.class), Mockito.anySet());
 
     Process[] processes = new Process[LOCAL_NUM_CONTAINER];
     Set<PackingPlan.ContainerPlan> existingContainers = new HashSet<>();
     for (int i = 0; i < LOCAL_NUM_CONTAINER; i++) {
       processes[i] = Mockito.mock(Process.class);
-      Mockito.doReturn(processes[i]).when(scheduler).startExecutorProcess(i);
+      Set<PackingPlan.InstancePlan> instances
+          = (i == 0) ?  null : PackingTestUtils.testContainerPlan(i).getInstances();
+      Mockito.doReturn(processes[i]).when(scheduler)
+          .startExecutorProcess(i,  instances);
       if (i > 0) {
         // ignore the container for TMaster. existing containers simulate the containers created
         // by packing plan
@@ -165,7 +198,8 @@
     Mockito.when(packingPlan.getContainers()).thenReturn(existingContainers);
     Assert.assertTrue(scheduler.onSchedule(packingPlan));
     verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4, 5);
-    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
 
     Set<PackingPlan.ContainerPlan> containersToRemove = new HashSet<>();
     PackingPlan.ContainerPlan containerToRemove =
@@ -175,7 +209,8 @@
     verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4);
     Mockito.verify(processes[LOCAL_NUM_CONTAINER - 1]).destroy();
     // verify no new process restarts
-    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
 
     containersToRemove.clear();
     containersToRemove.add(PackingTestUtils.testContainerPlan(1));
@@ -185,7 +220,8 @@
     Mockito.verify(processes[1]).destroy();
     Mockito.verify(processes[2]).destroy();
     // verify no new process restarts
-    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
   }
 
   private void verifyIdsOfLaunchedContainers(int... ids) {
@@ -249,17 +285,21 @@
     int exitValue = 1;
     Process containerExecutor = Mockito.mock(Process.class);
     Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
-    Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+    Mockito.doNothing().when(scheduler).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
 
     // Start the process
     scheduler.getProcessToContainer().put(containerExecutor, containerId);
-    scheduler.startExecutorMonitor(containerId, containerExecutor);
+    scheduler.startExecutorMonitor(
+        containerId, containerExecutor,
+        PackingTestUtils.testContainerPlan(containerId).getInstances());
 
     // Shut down the MonitorService
     scheduler.getMonitorService().shutdown();
     scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
     // The dead process should be restarted
-    Mockito.verify(scheduler).startExecutor(containerId);
+    Mockito.verify(scheduler).startExecutor(containerId,
+        PackingTestUtils.testContainerPlan(containerId).getInstances());
     Assert.assertFalse(scheduler.isTopologyKilled());
   }
 
@@ -270,20 +310,22 @@
 
     Process containerExecutor = Mockito.mock(Process.class);
     Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
-    Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+    Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt(), Mockito.anySet());
 
     // Set the killed flag and the dead process should not be restarted
     scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance());
 
     // Start the process
     scheduler.getProcessToContainer().put(containerExecutor, containerId);
-    scheduler.startExecutorMonitor(containerId, containerExecutor);
+    scheduler.startExecutorMonitor(containerId, containerExecutor,
+        PackingTestUtils.testContainerPlan(containerId).getInstances());
 
     // Shut down the MonitorService
     scheduler.getMonitorService().shutdown();
     scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
     // The dead process should not be restarted
-    Mockito.verify(scheduler, Mockito.never()).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.never())
+        .startExecutor(Mockito.anyInt(), Mockito.anySet());
     Assert.assertTrue(scheduler.isTopologyKilled());
   }
 }
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java
index 2684b3a..52bcdca 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java
@@ -138,7 +138,7 @@
     Assert.assertEquals(CPU, container.cpu, 0.01);
     Assert.assertEquals(MEM, ByteAmount.fromMegabytes(((Double) container.memInMB).longValue()));
     Assert.assertEquals(DISK, ByteAmount.fromMegabytes(((Double) container.diskInMB).longValue()));
-    Assert.assertEquals(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR, container.ports);
+    Assert.assertEquals(SchedulerUtils.ExecutorPort.getRequiredPorts().size(), container.ports);
     Assert.assertEquals(2, container.dependencies.size());
     Assert.assertTrue(container.dependencies.contains(CORE_PACKAGE_URI));
     Assert.assertTrue(container.dependencies.contains(TOPOLOGY_PACKAGE_URI));
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java
index 0365252..8c7de7e 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java
@@ -110,7 +110,7 @@
     container.cpu = CPU;
     container.diskInMB = DISK;
     container.memInMB = MEM;
-    container.ports = SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR;
+    container.ports = SchedulerUtils.ExecutorPort.getRequiredPorts().size();
     container.shell = true;
     container.retries = Integer.MAX_VALUE;
     container.dependencies = new ArrayList<>();
@@ -122,7 +122,7 @@
 
     // List of free ports
     List<Integer> freePorts = new ArrayList<>();
-    for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
+    for (int i = 0; i < SchedulerUtils.ExecutorPort.getRequiredPorts().size(); i++) {
       freePorts.add(i);
     }
 
diff --git a/heron/simulator/src/java/BUILD b/heron/simulator/src/java/BUILD
index 39da8d2..221495d 100644
--- a/heron/simulator/src/java/BUILD
+++ b/heron/simulator/src/java/BUILD
@@ -6,7 +6,7 @@
 
 simulator_deps_files = \
     heron_java_proto_files() + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:utils-java",
diff --git a/heron/simulator/tests/java/BUILD b/heron/simulator/tests/java/BUILD
index 25d1497..12db81a 100644
--- a/heron/simulator/tests/java/BUILD
+++ b/heron/simulator/tests/java/BUILD
@@ -4,7 +4,7 @@
     name = "simulator-tests",
     srcs = glob(["**/*.java"]),
     deps =  heron_java_proto_files() + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:utils-java",
diff --git a/heron/spi/src/java/BUILD b/heron/spi/src/java/BUILD
index 80283b2..33311ac 100644
--- a/heron/spi/src/java/BUILD
+++ b/heron/spi/src/java/BUILD
@@ -27,7 +27,7 @@
     ]),
     javacopts = DOCLINT_HTML_AND_SYNTAX,
     deps = [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/api/src/java:classification",
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
@@ -58,7 +58,7 @@
         "//heron/common/src/java:basics-java",
         "//heron/common/src/java:config-java",
         "//heron/common/src/java:utils-java",
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "@com_google_guava_guava//jar",
     ]
 
@@ -78,7 +78,7 @@
     ]
 
 packing_deps_files = [
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
     ":common-spi-java",
     "//heron/api/src/java:classification",
     "//heron/common/src/java:basics-java",
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
index ee3695f..14a4d60 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
@@ -23,15 +23,12 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 public class ShellUtilsTest {
 
-  private static final Logger LOG = Logger.getLogger(ShellUtilsTest.class.getName());
-
   private static String generateRandomLongString(int size) {
     StringBuilder builder = new StringBuilder();
     Random random = new Random();
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
index af22c36..54694e9 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
@@ -14,12 +14,19 @@
 
 package com.twitter.heron.spi.utils;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
 
 public class UploaderUtilsTest {
 
@@ -55,4 +62,42 @@
         UploaderUtils.generateFilename(topologyName, role, tag, version, extension);
     Assert.assertTrue(customizedFilename.endsWith(extension));
   }
+
+  @Test
+  public void testCopyToOutputStream() throws Exception {
+    String fileContent = "temp file test content";
+    String prefix = "myTestFile";
+    String suffix = ".tmp";
+    File tempFile = null;
+    try {
+      // create temp file
+      tempFile = File.createTempFile(prefix, suffix);
+
+      // write content to temp file
+      writeContentToFile(tempFile.getAbsolutePath(), fileContent);
+
+      // copy file content to output stream
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      UploaderUtils.copyToOutputStream(tempFile.getAbsolutePath(), out);
+      Assert.assertEquals(fileContent, new String(out.toByteArray()));
+    } finally {
+      if (tempFile != null) {
+        tempFile.deleteOnExit();
+      }
+    }
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testCopyToOutputStreamWithInvalidFile() throws Exception {
+    UploaderUtils.copyToOutputStream("invalid_file_name", new ByteArrayOutputStream());
+  }
+
+  private void writeContentToFile(String fileName, String content) {
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(fileName))) {
+      bw.write(content);
+    } catch (IOException e) {
+      fail("Unexpected IOException has been thrown so unit test fails. Error message: "
+          + e.getMessage());
+    }
+  }
 }
diff --git a/heron/statemgrs/src/java/BUILD b/heron/statemgrs/src/java/BUILD
index 82e81d6..4757c4f 100644
--- a/heron/statemgrs/src/java/BUILD
+++ b/heron/statemgrs/src/java/BUILD
@@ -12,11 +12,11 @@
 
 localfs_deps_files = \
     common_deps_files + [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//heron/spi/src/java:heron-spi",
     ]
-    
+
 zookeeper_deps_files = \
     localfs_deps_files + [
         "@org_apache_curator_curator_client//jar",
@@ -31,7 +31,7 @@
     srcs = glob(
         ["**/*.java"],
     ),
-    deps = zookeeper_deps_files, 
+    deps = zookeeper_deps_files,
 )
 
 java_library(
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index acb58eb..d677019 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -426,6 +426,7 @@
     Config config = Config.newBuilder()
         .put(Key.STATEMGR_ROOT_PATH, "/storm/heron/states")
         .put(Key.STATEMGR_CONNECTION_STRING, zookeeperHostname)
+        .put(Key.SCHEDULER_IS_SERVICE, false)
         .build();
     CuratorStateManager stateManager = new CuratorStateManager();
     stateManager.doMain(args, config);
diff --git a/heron/tools/apiserver/src/java/BUILD b/heron/tools/apiserver/src/java/BUILD
index efd161e..91f3ee4 100644
--- a/heron/tools/apiserver/src/java/BUILD
+++ b/heron/tools/apiserver/src/java/BUILD
@@ -6,7 +6,7 @@
   "//heron/spi/src/java:heron-spi",
   "//heron/common/src/java:basics-java",
   "//heron/common/src/java:utils-java",
-  "//heron/api/src/java:api-java"
+  "//heron/api/src/java:api-java-low-level"
 ]
 
 scheduler_deps_files = [
diff --git a/heron/tools/cli/src/python/cdefs.py b/heron/tools/cli/src/python/cdefs.py
index ac4fed0..85e9afd 100644
--- a/heron/tools/cli/src/python/cdefs.py
+++ b/heron/tools/cli/src/python/cdefs.py
@@ -14,7 +14,6 @@
 ''' cdefs.py '''
 import os
 
-import heron.common.src.python.utils.log as Log
 import heron.tools.cli.src.python.cliconfig as cliconfig
 import heron.tools.common.src.python.utils.config as config
 
@@ -50,6 +49,5 @@
   '''
   config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
   if not os.path.isdir(config_path):
-    Log.error("Cluster config directory \'%s\' does not exist", config_path)
     return False
   return True
diff --git a/heron/tools/cli/src/python/main.py b/heron/tools/cli/src/python/main.py
index 426dc94..7db01e9 100644
--- a/heron/tools/cli/src/python/main.py
+++ b/heron/tools/cli/src/python/main.py
@@ -221,6 +221,7 @@
 
   # check if the cluster config directory exists
   if not cdefs.check_direct_mode_cluster_definition(cluster, config_path):
+    Log.error("Cluster config directory \'%s\' does not exist", config_path)
     return dict()
 
   config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
index 85db4f4..18e9ed1 100644
--- a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
@@ -30,13 +30,14 @@
 
 public class LocalFileSystemUploaderTest {
 
+  private static final String TOPOLOGY_PACKAGE_FILE_NAME = "some-topology.tar";
+
   private Config config;
   private String fileSystemDirectory;
   private String testTopologyDirectory;
 
   @Before
   public void before() throws Exception {
-
     // form the file system directory using bazel environ files
     fileSystemDirectory = Paths.get(System.getenv("JAVA_RUNFILES"), "topologies").toString();
 
@@ -60,17 +61,17 @@
   }
 
   @Test
-  public void testUploader() throws Exception {
-
+  public void testUploader() {
     // identify the location of the test topology tar file
-    String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
 
-    Config newconfig = Config.newBuilder()
+    Config newConfig = Config.newBuilder()
         .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
 
     // create the uploader and load the package
     LocalFileSystemUploader uploader = new LocalFileSystemUploader();
-    uploader.initialize(newconfig);
+    uploader.initialize(newConfig);
     Assert.assertNotNull(uploader.uploadPackage());
 
     // verify if the file exists
@@ -79,33 +80,32 @@
   }
 
   @Test(expected = UploaderException.class)
-  public void testSourceNotExists() throws Exception {
-
+  public void testSourceNotExists() {
     // identify the location of the test topology tar file
     String topologyPackage = Paths.get(
         testTopologyDirectory, "doesnot-exist-topology.tar").toString();
 
-    Config newconfig = Config.newBuilder()
+    Config newConfig = Config.newBuilder()
         .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
 
     // create the uploader and load the package
     LocalFileSystemUploader uploader = new LocalFileSystemUploader();
-    uploader.initialize(newconfig);
+    uploader.initialize(newConfig);
     uploader.uploadPackage();
   }
 
   @Test
-  public void testUndo() throws Exception {
-
+  public void testUndo() {
     // identify the location of the test topology tar file
-    String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
 
-    Config newconfig = Config.newBuilder()
+    Config newConfig = Config.newBuilder()
         .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
 
     // create the uploader and load the package
     LocalFileSystemUploader uploader = new LocalFileSystemUploader();
-    uploader.initialize(newconfig);
+    uploader.initialize(newConfig);
     Assert.assertNotNull(uploader.uploadPackage());
 
     // verify if the file exists
@@ -113,7 +113,95 @@
     Assert.assertTrue(new File(destFile).isFile());
 
     // now undo the file
-    uploader.undo();
+    Assert.assertTrue(uploader.undo());
     Assert.assertFalse(new File(destFile).isFile());
   }
+
+  @Test
+  public void testUseDefaultFileSystemDirectoryWhenNotSet() {
+    // identify the location of the test topology tar file
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+    // set file system directory as null
+    Config newConfig = Config.newBuilder()
+        .putAll(config)
+        .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+        .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), null)
+        .build();
+
+    // create the uploader
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.initialize(newConfig);
+
+    // get default file system directory
+    String defaultFileSystemDirectory = LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString();
+
+    String destDirectory = uploader.getTopologyDirectory();
+    String destFile = uploader.getTopologyFile();
+
+    // verify usage of default file system directory for destination directory and file
+    Assert.assertEquals(destDirectory, defaultFileSystemDirectory);
+    Assert.assertTrue(destFile.contains(defaultFileSystemDirectory));
+  }
+
+  @Test
+  public void testUploadPackageWhenTopologyFileAlreadyExists() {
+    // identify the location of the test topology tar file
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+    Config newConfig = Config.newBuilder()
+        .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
+
+    // create the uploader and load the package
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.initialize(newConfig);
+    Assert.assertNotNull(uploader.uploadPackage());
+
+    // verify if the file exists
+    String destFile = uploader.getTopologyFile();
+    Assert.assertTrue(new File(destFile).isFile());
+
+    // load same package again by overriding existing one
+    Assert.assertNotNull(uploader.uploadPackage());
+    String destFile2 = uploader.getTopologyFile();
+    Assert.assertTrue(new File(destFile2).isFile());
+
+    // verify that existing file is overridden
+    Assert.assertEquals(destFile, destFile2);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUploadPackageWhenFileSystemDirectoryIsInvalid() {
+    // identify the location of the test topology tar file
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+    String invalidFileSystemDirectory = "invalid%path";
+
+    // set invalid file system directory
+    Config newConfig = Config.newBuilder()
+        .putAll(config)
+        .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+        .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), invalidFileSystemDirectory).build();
+
+    // create the uploader and load the package
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.initialize(newConfig);
+    uploader.uploadPackage();
+  }
+
+  @Test
+  public void testGetUri() {
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    Assert.assertEquals(uploader.getUri("testFileName").toString(), "file://testFileName");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetUriWhenDestFileNameIsInvalid() {
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.getUri("invalid_%_DestFilePath");
+  }
+
 }
diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index ca48984..279cc8a 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -14,6 +14,7 @@
     ),
     deps = [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//storm-compatibility/src/java:storm-compatibility-java",
         "//heron/proto:proto_topology_java",
         "//third_party/java:jackson",
@@ -31,6 +32,7 @@
     ),
     deps = [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//storm-compatibility/src/java:storm-compatibility-java",
         "//third_party/java:hadoop-core",
         "//third_party/java:jackson",
@@ -46,6 +48,7 @@
     ),
     deps = [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//storm-compatibility/src/java:storm-compatibility-java",
         "@com_googlecode_json_simple_json_simple//jar",
         "@commons_cli_commons_cli//jar",
@@ -61,6 +64,7 @@
     ),
     deps = [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//storm-compatibility/src/java:storm-compatibility-java",
         ":common",
         ":core"
@@ -74,6 +78,7 @@
     ),
     deps = [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//storm-compatibility/src/java:storm-compatibility-java",
         "@commons_cli_commons_cli//jar",
         "@com_googlecode_json_simple_json_simple//jar",
@@ -96,6 +101,7 @@
     ),
     deps = [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//storm-compatibility/src/java:storm-compatibility-java",
         ":common",
         ":core"
diff --git a/storm-compatibility-examples/src/java/BUILD b/storm-compatibility-examples/src/java/BUILD
index da237dc..0f5ef31 100644
--- a/storm-compatibility-examples/src/java/BUILD
+++ b/storm-compatibility-examples/src/java/BUILD
@@ -4,7 +4,7 @@
     name='heron-storm-compatibility-examples-unshaded',
     srcs = glob(["**/*.java"]),
     deps = [
-        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/common/src/java:basics-java",
         "//storm-compatibility/src/java:storm-compatibility-java",
     ],
diff --git a/storm-compatibility/src/java/BUILD b/storm-compatibility/src/java/BUILD
index 91ae2d7..e5e3409 100644
--- a/storm-compatibility/src/java/BUILD
+++ b/storm-compatibility/src/java/BUILD
@@ -5,7 +5,7 @@
 load("/tools/rules/javadoc", "java_doc")
 
 storm_deps_files = [
-    "//heron/api/src/java:api-java",
+    "//heron/api/src/java:api-java-low-level",
     "//heron/common/src/java:basics-java",
     "//heron/simulator/src/java:simulator-java",
     "//heron/proto:proto_topology_java",
diff --git a/storm-compatibility/src/java/backtype/storm/StormSubmitter.java b/storm-compatibility/src/java/backtype/storm/StormSubmitter.java
index 7d68d5e..c99e058 100644
--- a/storm-compatibility/src/java/backtype/storm/StormSubmitter.java
+++ b/storm-compatibility/src/java/backtype/storm/StormSubmitter.java
@@ -53,7 +53,8 @@
       Map stormConfig,
       StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
 
-    // First do config translation
+    // First do topology config translation. Bolt and Spout config translation is handled
+    // in their own DeclarerImpl classes.
     com.twitter.heron.api.Config heronConfig = ConfigUtils.translateConfig(stormConfig);
 
     // Now submit a heron topology
diff --git a/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java b/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java
index f162588..ae0bc78 100644
--- a/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java
+++ b/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java
@@ -18,11 +18,13 @@
 
 package backtype.storm.topology;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.grouping.CustomStreamGroupingDelegate;
 import backtype.storm.tuple.Fields;
+import backtype.storm.utils.ConfigUtils;
 import backtype.storm.utils.Utils;
 
 public class BoltDeclarerImpl implements BoltDeclarer {
@@ -35,13 +37,18 @@
   @Override
   @SuppressWarnings({"rawtypes", "unchecked"})
   public BoltDeclarer addConfigurations(Map conf) {
-    delegate.addConfigurations((Map<String, Object>) conf);
+    // Translate config to heron config and then apply.
+    Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+    delegate.addConfigurations(heronConf);
     return this;
   }
 
   @Override
   public BoltDeclarer addConfiguration(String config, Object value) {
-    delegate.addConfiguration(config, value);
+    Map<String, Object> configMap = new HashMap<String, Object>();
+    configMap.put(config, value);
+
+    addConfigurations(configMap);
     return this;
   }
 
diff --git a/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java b/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java
index 7cf4092..29c5313 100644
--- a/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java
+++ b/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java
@@ -18,8 +18,11 @@
 
 package backtype.storm.topology;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import backtype.storm.utils.ConfigUtils;
+
 public class SpoutDeclarerImpl implements SpoutDeclarer {
   private com.twitter.heron.api.topology.SpoutDeclarer delegate;
 
@@ -30,13 +33,18 @@
   @Override
   @SuppressWarnings({"rawtypes", "unchecked"})
   public SpoutDeclarer addConfigurations(Map conf) {
-    delegate.addConfigurations((Map<String, Object>) conf);
+    // Translate config to heron config and then apply.
+    Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+    delegate.addConfigurations(heronConf);
     return this;
   }
 
   @Override
   public SpoutDeclarer addConfiguration(String config, Object value) {
-    delegate.addConfiguration(config, value);
+    Map<String, Object> configMap = new HashMap<String, Object>();
+    configMap.put(config, value);
+
+    addConfigurations(configMap);
     return this;
   }
 
diff --git a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
index 25cbf7a..18e25de 100644
--- a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
@@ -39,57 +39,31 @@
   @SuppressWarnings({"rawtypes", "unchecked"})
   public static Config translateConfig(Map stormConfig) {
     Config heronConfig = new Config((Map<String, Object>) stormConfig);
+
     // Look at serialization stuff first
     doSerializationTranslation(heronConfig);
 
     // Now look at supported apis
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
-      heronConfig.put(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
-          heronConfig.get(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
-    }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_WORKERS)) {
-      Integer nWorkers = Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_WORKERS));
-      com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
-    }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
-      Integer nAckers =
-          Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
-      if (nAckers > 0) {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
-      } else {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-      }
-    } else {
-      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-    }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
-      Integer nSecs =
-          Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
-      com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
-    }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
-      Integer nPending =
-          Utils.getInt(
-              heronConfig.get(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
-      com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
-    }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
-      Integer tSecs =
-          Utils.getInt(
-              heronConfig.get(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
-      com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
-    }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_DEBUG)) {
-      Boolean dBg =
-          Boolean.parseBoolean(heronConfig.get(backtype.storm.Config.TOPOLOGY_DEBUG).toString());
-      com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
-    }
+    doStormTranslation(heronConfig);
 
     doTaskHooksTranslation(heronConfig);
 
+    doTopologyLevelTranslation(heronConfig);
+
+    return heronConfig;
+  }
+
+  /**
+   * Translate storm config to heron config for components
+   * @param stormConfig the storm config
+   * @return a heron config
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public static Config translateComponentConfig(Map stormConfig) {
+    Config heronConfig = new Config((Map<String, Object>) stormConfig);
+
+    doStormTranslation(heronConfig);
+
     return heronConfig;
   }
 
@@ -139,4 +113,66 @@
       heronConfig.setAutoTaskHooks(translationHooks);
     }
   }
+
+  /**
+   * Translate storm config into heron config. This funciton is used by both topology
+   * and component level config translations. Therefore NO config should be generated
+   * when a key does NOT exist if the key is for both topology and component.
+   * Otherwise the component config might overwrite the topolgy config with a wrong value.
+   * @param heron the heron config object to receive the results.
+   */
+  private static void doStormTranslation(Config heronConfig) {
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
+      heronConfig.put(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
+          heronConfig.get(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
+    }
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_WORKERS)) {
+      Integer nWorkers = Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_WORKERS));
+      com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
+    }
+
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+      Integer nSecs =
+          Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+      com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
+    }
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
+      Integer nPending =
+          Utils.getInt(
+              heronConfig.get(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
+      com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
+    }
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
+      Integer tSecs =
+          Utils.getInt(
+              heronConfig.get(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
+      com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
+    }
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_DEBUG)) {
+      Boolean dBg =
+          Boolean.parseBoolean(heronConfig.get(backtype.storm.Config.TOPOLOGY_DEBUG).toString());
+      com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
+    }
+  }
+
+  /**
+   * Translate topology config.
+   * @param heron the heron config object to receive the results.
+   */
+  private static void doTopologyLevelTranslation(Config heronConfig) {
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+      Integer nAckers =
+          Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+      if (nAckers > 0) {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+      } else {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+      }
+    } else {
+      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+    }
+  }
 }
diff --git a/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java b/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java
index 24cb0d8..4936067 100644
--- a/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java
+++ b/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java
@@ -53,7 +53,8 @@
       Map stormConfig,
       StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
 
-    // First do config translation
+    // First do topology config translation. Bolt and Spout config translation is handled
+    // in their own DeclarerImpl classes.
     com.twitter.heron.api.Config heronConfig = ConfigUtils.translateConfig(stormConfig);
 
     // Now submit a heron topology
diff --git a/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java b/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java
index f4aaa65..8cbf926 100644
--- a/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java
+++ b/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java
@@ -18,11 +18,13 @@
 
 package org.apache.storm.topology;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.grouping.CustomStreamGroupingDelegate;
 import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 
 public class BoltDeclarerImpl implements BoltDeclarer {
@@ -35,13 +37,18 @@
   @Override
   @SuppressWarnings({"rawtypes", "unchecked"})
   public BoltDeclarer addConfigurations(Map conf) {
-    delegate.addConfigurations((Map<String, Object>) conf);
+    // Translate config to heron config and then apply.
+    Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+    delegate.addConfigurations(heronConf);
     return this;
   }
 
   @Override
   public BoltDeclarer addConfiguration(String config, Object value) {
-    delegate.addConfiguration(config, value);
+    Map<String, Object> configMap = new HashMap<String, Object>();
+    configMap.put(config, value);
+
+    addConfigurations(configMap);
     return this;
   }
 
diff --git a/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java b/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java
index 28dcc90..f6e8675 100644
--- a/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java
+++ b/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java
@@ -18,8 +18,11 @@
 
 package org.apache.storm.topology;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.storm.utils.ConfigUtils;
+
 public class SpoutDeclarerImpl implements SpoutDeclarer {
   private com.twitter.heron.api.topology.SpoutDeclarer delegate;
 
@@ -30,13 +33,18 @@
   @Override
   @SuppressWarnings({"rawtypes", "unchecked"})
   public SpoutDeclarer addConfigurations(Map conf) {
-    delegate.addConfigurations((Map<String, Object>) conf);
+    // Translate config to heron config and then apply.
+    Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+    delegate.addConfigurations(heronConf);
     return this;
   }
 
   @Override
   public SpoutDeclarer addConfiguration(String config, Object value) {
-    delegate.addConfiguration(config, value);
+    Map<String, Object> configMap = new HashMap<String, Object>();
+    configMap.put(config, value);
+
+    addConfigurations(configMap);
     return this;
   }
 
diff --git a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
index 910b2a7..d7b23a2 100644
--- a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
@@ -32,7 +32,7 @@
   }
 
   /**
-   * Translate storm config to heron config
+   * Translate storm config to heron config for topology
    * @param stormConfig the storm config
    * @return a heron config
    */
@@ -43,57 +43,26 @@
     doSerializationTranslation(heronConfig);
 
     // Now look at supported apis
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
-      heronConfig.put(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
-          heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_WORKERS)) {
-      Integer nWorkers = Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_WORKERS));
-      com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
-      Integer nAckers =
-          Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
-      if (nAckers > 0) {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
-      } else {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-      }
-    } else {
-      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
-      Integer nSecs =
-          Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
-      com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
-      Integer nPending =
-          Utils.getInt(
-              heronConfig.get(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
-      com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
-      Integer tSecs =
-          Utils.getInt(
-              heronConfig.get(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
-      com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_DEBUG)) {
-      Boolean dBg =
-          Boolean.parseBoolean(heronConfig.get(org.apache.storm.Config.TOPOLOGY_DEBUG).toString());
-      com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
-    }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT)) {
-      com.twitter.heron.api.Config.setEnvironment(heronConfig,
-          (Map) heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT));
-    }
+    doStormTranslation(heronConfig);
 
     doTaskHooksTranslation(heronConfig);
 
+    doTopologyLevelTranslation(heronConfig);
+
+    return heronConfig;
+  }
+
+  /**
+   * Translate storm config to heron config for components
+   * @param stormConfig the storm config
+   * @return a heron config
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public static Config translateComponentConfig(Map stormConfig) {
+    Config heronConfig = new Config((Map<String, Object>) stormConfig);
+
+    doStormTranslation(heronConfig);
+
     return heronConfig;
   }
 
@@ -144,4 +113,69 @@
       heronConfig.setAutoTaskHooks(translationHooks);
     }
   }
+
+  /**
+   * Translate storm config into heron config. This funciton is used by both topology
+   * and component level config translations. Therefore NO config should be generated
+   * when a key does NOT exist if the key is for both topology and component.
+   * Otherwise the component config might overwrite the topolgy config with a wrong value.
+   * @param heron the heron config object to receive the results.
+   */
+  private static void doStormTranslation(Config heronConfig) {
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
+      heronConfig.put(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
+          heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
+    }
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_WORKERS)) {
+      Integer nWorkers = Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_WORKERS));
+      com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
+    }
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+      Integer nSecs =
+          Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+      com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
+    }
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
+      Integer nPending =
+          Utils.getInt(
+              heronConfig.get(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
+      com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
+    }
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
+      Integer tSecs =
+          Utils.getInt(
+              heronConfig.get(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
+      com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
+    }
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_DEBUG)) {
+      Boolean dBg =
+          Boolean.parseBoolean(heronConfig.get(org.apache.storm.Config.TOPOLOGY_DEBUG).toString());
+      com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
+    }
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT)) {
+      com.twitter.heron.api.Config.setEnvironment(heronConfig,
+          (Map) heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT));
+    }
+  }
+
+  /**
+   * Translate topology config.
+   * @param heron the heron config object to receive the results.
+   */
+  private static void doTopologyLevelTranslation(Config heronConfig) {
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+      Integer nAckers =
+          Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+      if (nAckers > 0) {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+      } else {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+      }
+    } else {
+      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+    }
+  }
 }
diff --git a/third_party/java/BUILD b/third_party/java/BUILD
index 9645eea..9d9053f 100644
--- a/third_party/java/BUILD
+++ b/third_party/java/BUILD
@@ -96,6 +96,7 @@
         "@com_esotericsoftware_reflectasm//jar",
         "@com_esotericsoftware_minlog//jar",
         "@org_objenesis_objenesis//jar",
+        "@org_objectweb_asm//jar",
     ],
 )
 
diff --git a/website/config.yaml b/website/config.yaml
index b616119..a3f6d47 100755
--- a/website/config.yaml
+++ b/website/config.yaml
@@ -20,9 +20,9 @@
   author: Twitter, Inc.
   description: A realtime, distributed, fault-tolerant stream processing engine from Twitter
   versions:
-    heron: 0.16.4
+    heron: 0.16.5
     bazel: 0.5.4
-    heronpy: 0.16.4
+    heronpy: 0.16.5
   assets:
     favicon:
       small: /img/favicon-16x16.png
diff --git a/website/content/docs/operators/deployment/uploaders/localfs.md b/website/content/docs/operators/deployment/uploaders/localfs.md
index 4e9e2a9..bd5411a 100644
--- a/website/content/docs/operators/deployment/uploaders/localfs.md
+++ b/website/content/docs/operators/deployment/uploaders/localfs.md
@@ -27,7 +27,7 @@
 * `heron.uploader.localfs.file.system.directory` --- Provides the name of the directory where
 the topology jar should be uploaded. The name of the directory should be unique per cluster
 You could use the Heron environment variables `${CLUSTER}` that will be substituted by cluster
-name.
+name. If this is not set, `${HOME}/.herondata/repository/${CLUSTER}/${ROLE}/${TOPOLOGY}` will be set as default.
 
 ### Example Local File System Uploader Configuration