Merge branch 'master' into karthik/rmyamlbinary
diff --git a/.travis.yml b/.travis.yml
index a4ef75a..ee0e203 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,6 @@
   apt:
     sources:
       - ubuntu-toolchain-r-test
-      - george-edison55-precise-backports # cmake 3.2.3 / doxygen 1.8.3
 
     packages:
       - gcc-4.8
@@ -23,8 +22,6 @@
       - pkg-config
       - zip
       - zlib1g-dev
-      - cmake
-      - cmake-data
 
 env:
   - CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8
diff --git a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
index 75338db..33bd0a0 100644
--- a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
@@ -67,6 +67,11 @@
     conf.setContainerRamRequested(ByteAmount.fromGigabytes(2));
     conf.setContainerCpuRequested(2);
 
+    // Specify the size of ram padding to per container.
+    // Notice, this config will be considered as a hint,
+    // and it's up to the packing algorithm to determine whether to apply this hint
+    conf.setContainerRamPadding(ByteAmount.fromGigabytes(2));
+
     if (args != null && args.length > 0) {
       conf.setNumStmgrs(2);
       HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
index a24f013..ba07289 100644
--- a/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
@@ -35,8 +35,8 @@
   }
 
   // Heron resources to be applied to the topology
-  private static final float CPU = 2.0f;
-  private static final long GIGABYTES_OF_RAM = 6;
+  private static final float CPU = 1.5f;
+  private static final int GIGABYTES_OF_RAM = 8;
   private static final int NUM_CONTAINERS = 2;
 
   /**
@@ -49,7 +49,6 @@
     Streamlet<Integer> zeroes = builder.newSource(() -> 0);
 
     builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
-        .setNumPartitions(2)
         .setName("random-ints")
         .map(i -> i + 1)
         .setName("add-one")
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
index 8bfa62e..703d205 100644
--- a/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
@@ -84,7 +84,6 @@
 
     Config config = new Config.Builder()
         .setNumContainers(topologyParallelism)
-        .useKryoSerializer()
         .build();
 
     // Fetches the topology name from the first command-line argument
diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java
index b231dd6..b281e54 100644
--- a/heron/api/src/java/com/twitter/heron/api/Config.java
+++ b/heron/api/src/java/com/twitter/heron/api/Config.java
@@ -275,6 +275,11 @@
    */
   public static final String TOPOLOGY_TIMER_EVENTS = "topology.timer.events";
 
+  /**
+   * Enable Remote debugging for java heron instances
+   */
+  public static final String TOPOLOGY_REMOTE_DEBUGGING_ENABLE = "topology.remote.debugging.enable";
+
   private static final long serialVersionUID = 2550967708478837032L;
   // We maintain a list of all user exposed vars
   private static Set<String> apiVars = new HashSet<>();
@@ -310,6 +315,7 @@
     apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH);
     apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS);
     apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS);
+    apiVars.add(TOPOLOGY_REMOTE_DEBUGGING_ENABLE);
   }
 
   public Config() {
@@ -688,4 +694,8 @@
     }
     timers.put(name, Pair.of(interval, task));
   }
+
+  public void setTopologyRemoteDebugging(boolean isOn) {
+    this.put(Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, String.valueOf(isOn));
+  }
 }
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
index d15bd8c..374f0a7 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
@@ -52,10 +52,8 @@
 import com.twitter.heron.api.tuple.Fields;
 import com.twitter.heron.api.tuple.Tuple;
 import com.twitter.heron.api.tuple.Values;
-import com.twitter.heron.api.utils.TupleUtils;
 import com.twitter.heron.api.windowing.Event;
 import com.twitter.heron.api.windowing.EvictionPolicy;
-import com.twitter.heron.api.windowing.TimerEvent;
 import com.twitter.heron.api.windowing.TimestampExtractor;
 import com.twitter.heron.api.windowing.TriggerPolicy;
 import com.twitter.heron.api.windowing.TupleWindowImpl;
@@ -221,17 +219,15 @@
         maxLagMs = DEFAULT_MAX_LAG_MS;
       }
       // watermark interval
-      int watermarkInterval;
+      long watermarkIntervalMs;
       if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
-        watermarkInterval = ((Number) topoConf.get(WindowingConfigs
+        watermarkIntervalMs = ((Number) topoConf.get(WindowingConfigs
             .TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
       } else {
-        watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
+        watermarkIntervalMs = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
       }
-      // Use tick tuple to perodically generate watermarks
-      Config.setTickTupleFrequencyMs(topoConf, watermarkInterval);
-      waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager,
-          maxLagMs, getComponentStreams(context));
+      waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkIntervalMs,
+          maxLagMs, getComponentStreams(context), topoConf);
     } else {
       if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
         throw new IllegalArgumentException(
@@ -304,9 +300,8 @@
         return new WatermarkTimeTriggerPolicy<>(slidingIntervalDurationMs, manager,
             evictionPolicy, manager);
       } else {
-        // set tick tuple frequency in milliseconds for timer in TimeTriggerPolicy
-        Config.setTickTupleFrequencyMs(topoConf, slidingIntervalDurationMs);
-        return new TimeTriggerPolicy<>(slidingIntervalDurationMs, manager, evictionPolicy);
+        return new TimeTriggerPolicy<>(slidingIntervalDurationMs, manager,
+            evictionPolicy, topoConf);
       }
     }
   }
@@ -352,30 +347,22 @@
 
   @Override
   public void execute(Tuple input) {
-    if (TupleUtils.isTick(input)) {
-      long currTime = System.currentTimeMillis();
-      windowManager.add(new TimerEvent<>(input, currTime));
-      if (isTupleTs()) {
-        waterMarkEventGenerator.run();
+    if (isTupleTs()) {
+      long ts = timestampExtractor.extractTimestamp(input);
+      if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
+        windowManager.add(input, ts);
+      } else {
+        if (lateTupleStream != null) {
+          windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
+        } else {
+          LOG.info(String.format(
+              "Received a late tuple %s with ts %d. This will not be " + "processed"
+                  + ".", input, ts));
+        }
+        windowedOutputCollector.ack(input);
       }
     } else {
-      if (isTupleTs()) {
-        long ts = timestampExtractor.extractTimestamp(input);
-        if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
-          windowManager.add(input, ts);
-        } else {
-          if (lateTupleStream != null) {
-            windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
-          } else {
-            LOG.info(String.format(
-                "Received a late tuple %s with ts %d. This will not be " + "processed"
-                    + ".", input, ts));
-          }
-          windowedOutputCollector.ack(input);
-        }
-      } else {
-        windowManager.add(input);
-      }
+      windowManager.add(input);
     }
   }
 
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
index 9a487f4..cf52f7c 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
@@ -290,4 +290,10 @@
 
     throw new IllegalStateException("Failed to find topology defn file");
   }
+
+  public static boolean getTopologyRemoteDebuggingEnabled(TopologyAPI.Topology topology) {
+    List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+    return Boolean.parseBoolean(TopologyUtils.getConfigWithDefault(
+        topologyConfig, Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, "false"));
+  }
 }
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/Event.java b/heron/api/src/java/com/twitter/heron/api/windowing/Event.java
index e26c917..6c227d3 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/Event.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/Event.java
@@ -63,10 +63,4 @@
    * @return true if this is a watermark event
    */
   boolean isWatermark();
-
-  /**
-   * If this is a timer event or not.  Timer events use Tick Tuples to trigger
-   * @return true if this a timer event
-   */
-  boolean isTimer();
 }
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java b/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java
index 182d4b7..f5dc839 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java
@@ -60,11 +60,6 @@
   }
 
   @Override
-  public boolean isTimer() {
-    return false;
-  }
-
-  @Override
   public String toString() {
     return "EventImpl{" + "event=" + event + ", ts=" + ts + '}';
   }
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/TimerEvent.java b/heron/api/src/java/com/twitter/heron/api/windowing/TimerEvent.java
deleted file mode 100644
index 027dd24..0000000
--- a/heron/api/src/java/com/twitter/heron/api/windowing/TimerEvent.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2017 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.twitter.heron.api.windowing;
-
-import java.io.Serializable;
-
-/**
- * Timer event used to trigger actions in windowing that needs to occur on a set frequency
- */
-public class TimerEvent<T extends Serializable> extends EventImpl<T> {
-  private static final long serialVersionUID = -9174292711796600228L;
-
-  public TimerEvent(T event, long ts) {
-    super(event, ts);
-  }
-
-  @Override
-  public boolean isTimer() {
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "TimerEvent{} " + super.toString();
-  }
-}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java b/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java
index 19a5560..ba0ac25 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java
@@ -34,10 +34,12 @@
 package com.twitter.heron.api.windowing;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.twitter.heron.api.Config;
 import com.twitter.heron.api.generated.TopologyAPI;
 
 /**
@@ -47,28 +49,35 @@
  * any tuple coming with an earlier timestamp can be considered as late events.
  */
 public class WaterMarkEventGenerator<T extends Serializable> {
+  private final Map<TopologyAPI.StreamId, Long> streamToTs;
   private final WindowManager<T> windowManager;
+  private long watermarkIntervalMs;
   private final int eventTsLag;
   private final Set<TopologyAPI.StreamId> inputStreams;
-  private final Map<TopologyAPI.StreamId, Long> streamToTs;
+  private  Map<String, Object> topoConf;
   private volatile long lastWaterMarkTs;
-  private boolean started = false;
 
   /**
    * Creates a new WatermarkEventGenerator.
    *
    * @param windowManager The window manager this generator will submit watermark events to
    * interval
+   * @param watermarkIntervalMs the interval at which watermarks should be emitted
    * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is
    * considered late
    * @param inputStreams The input streams this generator is expected to handle
+   * @param topoConf topology configurations
    */
-  public WaterMarkEventGenerator(WindowManager<T> windowManager, int eventTsLagMs,
-                                 Set<TopologyAPI.StreamId> inputStreams) {
-    this.windowManager = windowManager;
+  public WaterMarkEventGenerator(WindowManager<T> windowManager, long watermarkIntervalMs,
+                                 int eventTsLagMs,
+                                 Set<TopologyAPI.StreamId> inputStreams,
+                                 Map<String, Object> topoConf) {
     streamToTs = new ConcurrentHashMap<>();
+    this.windowManager = windowManager;
+    this.watermarkIntervalMs = watermarkIntervalMs;
     this.eventTsLag = eventTsLagMs;
     this.inputStreams = inputStreams;
+    this.topoConf = topoConf;
   }
 
   /**
@@ -85,12 +94,10 @@
   }
 
   public void run() {
-    if (started) {
-      long waterMarkTs = computeWaterMarkTs();
-      if (waterMarkTs > lastWaterMarkTs) {
-        this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
-        lastWaterMarkTs = waterMarkTs;
-      }
+    long waterMarkTs = computeWaterMarkTs();
+    if (waterMarkTs > lastWaterMarkTs) {
+      this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
+      lastWaterMarkTs = waterMarkTs;
     }
   }
 
@@ -110,6 +117,11 @@
   }
 
   public void start() {
-    started = true;
+    Config.registerTopologyTimerEvents(
+        topoConf,
+        "WaterMarkEventGeneratorTimer",
+        Duration.ofMillis(watermarkIntervalMs),
+        () -> run()
+    );
   }
 }
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java
index fe05d1b..84ac2ca 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java
@@ -44,7 +44,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Logger;
 
 import com.twitter.heron.api.windowing.EvictionPolicy.Action;
@@ -84,7 +83,6 @@
   private final List<T> expiredEvents;
   private final Set<Event<T>> prevWindowEvents;
   private final AtomicInteger eventsSinceLastExpiry;
-  private final ReentrantLock lock;
 
   /**
    * Constructs a {@link WindowManager}
@@ -100,8 +98,6 @@
     expiredEvents = new ArrayList<>();
     prevWindowEvents = new HashSet<>();
     eventsSinceLastExpiry = new AtomicInteger();
-    lock = new ReentrantLock(true);
-
   }
 
   /**
@@ -150,8 +146,6 @@
     // watermark events are not added to the queue.
     if (windowEvent.isWatermark()) {
       LOG.fine(String.format("Got watermark event with ts %d", windowEvent.getTimestamp()));
-    } else if (windowEvent.isTimer()) {
-      LOG.fine(String.format("Got timer event with ts %d", windowEvent.getTimestamp()));
     } else {
       queue.add(windowEvent);
     }
@@ -166,18 +160,14 @@
   public boolean onTrigger() {
     List<Event<T>> windowEvents = null;
     List<T> expired = null;
-    try {
-      lock.lock();
-            /*
-             * scan the entire window to handle out of order events in
-             * the case of time based windows.
-             */
-      windowEvents = scanEvents(true);
-      expired = new ArrayList<>(expiredEvents);
-      expiredEvents.clear();
-    } finally {
-      lock.unlock();
-    }
+
+    /*
+     * scan the entire window to handle out of order events in
+     * the case of time based windows.
+     */
+    windowEvents = scanEvents(true);
+    expired = new ArrayList<>(expiredEvents);
+    expiredEvents.clear();
     List<T> events = new ArrayList<>();
     List<T> newEvents = new ArrayList<>();
     for (Event<T> event : windowEvents) {
@@ -239,25 +229,22 @@
     LOG.fine(String.format("Scan events, eviction policy %s", evictionPolicy));
     List<T> eventsToExpire = new ArrayList<>();
     List<Event<T>> eventsToProcess = new ArrayList<>();
-    try {
-      lock.lock();
-      Iterator<Event<T>> it = queue.iterator();
-      while (it.hasNext()) {
-        Event<T> windowEvent = it.next();
-        Action action = evictionPolicy.evict(windowEvent);
-        if (action == EXPIRE) {
-          eventsToExpire.add(windowEvent.get());
-          it.remove();
-        } else if (!fullScan || action == STOP) {
-          break;
-        } else if (action == PROCESS) {
-          eventsToProcess.add(windowEvent);
-        }
+
+    Iterator<Event<T>> it = queue.iterator();
+    while (it.hasNext()) {
+      Event<T> windowEvent = it.next();
+      Action action = evictionPolicy.evict(windowEvent);
+      if (action == EXPIRE) {
+        eventsToExpire.add(windowEvent.get());
+        it.remove();
+      } else if (!fullScan || action == STOP) {
+        break;
+      } else if (action == PROCESS) {
+        eventsToProcess.add(windowEvent);
       }
-      expiredEvents.addAll(eventsToExpire);
-    } finally {
-      lock.unlock();
     }
+    expiredEvents.addAll(eventsToExpire);
+
     eventsSinceLastExpiry.set(0);
     LOG.fine(String.format("[%d] events expired from window.", eventsToExpire.size()));
     if (!eventsToExpire.isEmpty()) {
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java
index 2a60a44..869ee4d 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java
@@ -76,7 +76,7 @@
 
   @Override
   public void track(Event<T> event) {
-    if (!event.isWatermark() && !event.isTimer()) {
+    if (!event.isWatermark()) {
       currentCount.incrementAndGet();
     }
   }
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java
index 3b154b9..9fb6791 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java
@@ -65,7 +65,7 @@
 
   @Override
   public void track(Event<T> event) {
-    if (started && !event.isWatermark() && !event.isTimer()) {
+    if (started && !event.isWatermark()) {
       if (currentCount.incrementAndGet() >= count) {
         evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
         handler.onTrigger();
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java
index 3999d06..e07b328 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java
@@ -33,7 +33,10 @@
 package com.twitter.heron.api.windowing.triggers;
 
 import java.io.Serializable;
+import java.time.Duration;
+import java.util.Map;
 
+import com.twitter.heron.api.Config;
 import com.twitter.heron.api.windowing.DefaultEvictionContext;
 import com.twitter.heron.api.windowing.Event;
 import com.twitter.heron.api.windowing.EvictionPolicy;
@@ -49,25 +52,24 @@
   private long duration;
   private final TriggerHandler handler;
   private final EvictionPolicy<T, ?> evictionPolicy;
-  private boolean started = false;
+  private Map<String, Object> topoConf;
 
 
   public TimeTriggerPolicy(long millis, TriggerHandler handler) {
-    this(millis, handler, null);
+    this(millis, handler, null, new Config());
   }
 
   public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy<T, ?>
-      evictionPolicy) {
+      evictionPolicy, Map<String, Object> topoConf) {
     this.duration = millis;
     this.handler = handler;
     this.evictionPolicy = evictionPolicy;
+    this.topoConf = topoConf;
   }
 
   @Override
   public void track(Event<T> event) {
-    if (started && event.isTimer()) {
-      triggerTask();
-    }
+
   }
 
   @Override
@@ -77,7 +79,8 @@
 
   @Override
   public void start() {
-    started = true;
+    Config.registerTopologyTimerEvents(this.topoConf, "TimeTriggerPolicyTimer",
+        Duration.ofMillis(this.duration), () -> triggerTask());
   }
 
   @Override
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
index a6ee37a..5f3590b 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
@@ -16,7 +16,6 @@
 
 import java.io.Serializable;
 
-import com.twitter.heron.common.basics.ByteAmount;
 import com.twitter.heron.streamlet.impl.KryoSerializer;
 
 /**
@@ -27,8 +26,11 @@
  */
 public final class Config implements Serializable {
   private static final long serialVersionUID = 6204498077403076352L;
-
-  private com.twitter.heron.api.Config heronConfig;
+  private final com.twitter.heron.api.Config heronConfig;
+  private final int numContainers;
+  private final DeliverySemantics deliverySemantics;
+  private final Serializer serializer;
+  private final Resources resources;
 
   public enum DeliverySemantics {
     ATMOST_ONCE,
@@ -36,7 +38,16 @@
     EFFECTIVELY_ONCE
   }
 
+  public enum Serializer {
+    KRYO,
+    JAVA
+  }
+
   private Config(Builder builder) {
+    numContainers = builder.numContainers;
+    deliverySemantics = builder.deliverySemantics;
+    serializer = builder.serializer;
+    resources = builder.resources;
     heronConfig = builder.config;
   }
 
@@ -49,6 +60,22 @@
     return heronConfig;
   }
 
+  public int getNumContainers() {
+    return numContainers;
+  }
+
+  public DeliverySemantics getDeliverySemantics() {
+    return deliverySemantics;
+  }
+
+  public Serializer getSerializer() {
+    return serializer;
+  }
+
+  public Resources getResources() {
+    return resources;
+  }
+
   private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
       DeliverySemantics semantics) {
     switch (semantics) {
@@ -65,36 +92,47 @@
 
   public static class Builder {
     private com.twitter.heron.api.Config config;
+    private int numContainers;
+    private Serializer serializer;
+    private DeliverySemantics deliverySemantics;
+    private Resources resources;
 
     public Builder() {
+      serializer = Serializer.KRYO;
+      numContainers = 1;
+      deliverySemantics = DeliverySemantics.ATMOST_ONCE;
+      resources = Resources.defaultResources();
       config = new com.twitter.heron.api.Config();
     }
 
     /**
      * Sets the number of containers to run this topology
-     * @param numContainers The number of containers to distribute this topology
+     * @param containers The number of containers to distribute this topology
      */
-    public Builder setNumContainers(int numContainers) {
-      config.setNumStmgrs(numContainers);
+    public Builder setNumContainers(int containers) {
+      numContainers = containers;
+      config.setNumStmgrs(containers);
       return this;
     }
 
     /**
      * Sets resources used per container by this topology
-     * @param resources The resource to dedicate per container
+     * @param containerResources The resource to dedicate per container
      */
-    public Builder setContainerResources(Resources resources) {
+    public Builder setContainerResources(Resources containerResources) {
+      resources = containerResources;
       config.setContainerCpuRequested(resources.getCpu());
-      config.setContainerRamRequested(ByteAmount.fromBytes(resources.getRam()));
+      config.setContainerRamRequested(resources.getRam());
       return this;
     }
 
     /**
      * Sets the delivery semantics of the topology
-     * @param semantic The delivery semantic to be enforced
+     * @param semantics The delivery semantic to be enforced
      */
-    public Builder setDeliverySemantics(DeliverySemantics semantic) {
-      config.setTopologyReliabilityMode(Config.translateSemantics(semantic));
+    public Builder setDeliverySemantics(DeliverySemantics semantics) {
+      deliverySemantics = semantics;
+      config.setTopologyReliabilityMode(Config.translateSemantics(semantics));
       return this;
     }
 
@@ -108,20 +146,29 @@
       return this;
     }
 
-    /**
-     * Sets the topology to use the Kryo serializer for serializing
-     * streamlet elements
-     */
-    public Builder useKryoSerializer() {
-      try {
-        config.setSerializationClassName(new KryoSerializer().getClass().getName());
-      } catch (NoClassDefFoundError e) {
-        throw new RuntimeException("Linking with kryo is needed because useKryoSerializer is used");
+    private void applySerializer(Serializer topologySerializer) {
+      if (topologySerializer == Serializer.KRYO) {
+        try {
+          config.setSerializationClassName(KryoSerializer.class.getName());
+        } catch (NoClassDefFoundError e) {
+          throw new RuntimeException(
+              "Linking with Kryo is needed because setTopologySerializer is used");
+        }
       }
+    }
+
+    /**
+     * Sets the topology to use the specified topologySerializer for serializing
+     * streamlet elements
+     * @param topologySerializer The topologySerializer to be used in this topology
+     */
+    public Builder setTopologySerializer(Serializer topologySerializer) {
+      serializer = topologySerializer;
       return this;
     }
 
     public Config build() {
+      applySerializer(serializer);
       return new Config(this);
     }
   }
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
index 1deffcf..2e60151 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
@@ -16,6 +16,8 @@
 
 import java.io.Serializable;
 
+import com.twitter.heron.common.basics.ByteAmount;
+
 /**
  * Resources needed by the topology are encapsulated in this class.
  * Currently we deal with CPU and RAM. Others can be added later.
@@ -23,7 +25,7 @@
 public final class Resources implements Serializable {
   private static final long serialVersionUID = 630451253428388496L;
   private float cpu;
-  private long ram;
+  private ByteAmount ram;
 
   private Resources(Builder builder) {
     this.cpu = builder.cpu;
@@ -39,10 +41,22 @@
     return cpu;
   }
 
-  public long getRam() {
+  public ByteAmount getRam() {
     return ram;
   }
 
+  public long getRamBytes() {
+    return ram.asBytes();
+  }
+
+  public long getRamMegabytes() {
+    return ram.asMegabytes();
+  }
+
+  public long getRamGigabytes() {
+    return ram.asGigabytes();
+  }
+
   @Override
   public String toString() {
     return String.format("{ CPU: %s RAM: %s }", String.valueOf(cpu), String.valueOf(ram));
@@ -50,11 +64,11 @@
 
   public static class Builder {
     private float cpu;
-    private long ram;
+    private ByteAmount ram;
 
     public Builder() {
       this.cpu = 1.0f;
-      this.ram = 104857600;
+      this.ram = ByteAmount.fromBytes(104857600);
     }
 
     /**
@@ -62,7 +76,7 @@
      * @param nram The number of megabytes of RAM
      */
     public Builder setRamInMB(long nram) {
-      this.ram = nram * 1024;
+      this.ram = ByteAmount.fromMegabytes(nram);
       return this;
     }
 
@@ -71,7 +85,7 @@
      * @param nram The number of gigabytes of RAM
      */
     public Builder setRamInGB(long nram) {
-      this.ram = nram * 1024 * 1024;
+      this.ram = ByteAmount.fromGigabytes(nram);
       return this;
     }
 
@@ -89,7 +103,7 @@
      * @param containerRam The number of bytes of RAM
      */
     public Builder setRam(long containerRam) {
-      this.ram = containerRam;
+      this.ram = ByteAmount.fromBytes(containerRam);
       return this;
     }
 
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java b/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java
index aec84d7..324d240 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java
@@ -19,7 +19,7 @@
 
 /**
  * All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
  * Functions as their input. We simply decorate java.util. function
  * definitions with a Serializable tag to ensure that any supplied
  * lambda functions automatically become serializable.
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Sink.java b/heron/api/src/java/com/twitter/heron/streamlet/Sink.java
index beb3e96..1e2bef1 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Sink.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Sink.java
@@ -18,7 +18,7 @@
 
 /**
  * Sink is how Streamlet's end. The put method
- * invokation consumes the tuple into say external database/cache, etc.
+ * invocation consumes the tuple into say external database/cache, etc.
  * setup/cleanup is where the sink can do any one time setup work, like
  * establishing/closing connection to sources, etc.
  */
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Source.java b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
index fe2330a..b87717b 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Source.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
@@ -19,7 +19,7 @@
 
 /**
  * Source is how Streamlet's originate. The get method
- * invokation returns new element that form the tuples of the streamlet.
+ * invocation returns new element that form the tuples of the streamlet.
  * setup/cleanup is where the generator can do any one time setup work, like
  * establishing/closing connection to sources, etc.
  */
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java b/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java
index 882c813..4d66eb8 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java
@@ -34,7 +34,7 @@
  * Streamlet. One can think of a transformation attaching itself to the stream and processing
  * each tuple as they go by. Thus the parallelism of any operator is implicitly determined
  * by the number of partitions of the stream that it is operating on. If a particular
- * tranformation wants to operate at a different parallelism, one can repartition the
+ * transformation wants to operate at a different parallelism, one can repartition the
  * Streamlet before doing the transformation.
  */
 @InterfaceStability.Evolving
@@ -180,7 +180,7 @@
       T identity, SerializableBiFunction<T, R, ? extends T> reduceFn);
 
   /**
-   * Returns a new Streamlet thats the union of this and the ‘other’ streamlet. Essentially
+   * Returns a new Streamlet that is the union of this and the ‘other’ streamlet. Essentially
    * the new streamlet will contain tuples belonging to both Streamlets
   */
   Streamlet<R> union(Streamlet<? extends R> other);
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java
index 606ea0c..ca88eb2 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java
@@ -66,7 +66,7 @@
  * Streamlet. One can think of a transformation attaching itself to the stream and processing
  * each tuple as they go by. Thus the parallelism of any operator is implicitly determined
  * by the number of partitions of the stream that it is operating on. If a particular
- * tranformation wants to operate at a different parallelism, one can repartition the
+ * transformation wants to operate at a different parallelism, one can repartition the
  * Streamlet before doing the transformation.
  */
 public abstract class StreamletImpl<R> implements Streamlet<R> {
@@ -109,8 +109,8 @@
    */
   @Override
   public Streamlet<R> setName(String sName) {
-    if (sName == null || sName.isEmpty()) {
-      throw new IllegalArgumentException("Streamlet name cannot be null/empty");
+    if (sName == null || sName.trim().isEmpty()) {
+      throw new IllegalArgumentException("Streamlet name cannot be null/blank");
     }
     this.name = sName;
     return this;
diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java
index 01ef89a..12ceed7 100644
--- a/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java
+++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java
@@ -23,6 +23,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import com.twitter.heron.api.Config;
 import com.twitter.heron.api.generated.TopologyAPI;
 
 import static org.junit.Assert.*;
@@ -48,8 +49,8 @@
       }
     };
     // set watermark interval to a high value and trigger manually to fix timing issues
-    waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5, Collections
-        .singleton(streamId("s1")));
+    waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L, 5, Collections
+        .singleton(streamId("s1")), new Config());
     waterMarkEventGenerator.start();
   }
 
@@ -77,7 +78,8 @@
     Set<TopologyAPI.StreamId> streams = new HashSet<>();
     streams.add(streamId("s1"));
     streams.add(streamId("s2"));
-    waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5, streams);
+    waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L,
+        5, streams, new Config());
     waterMarkEventGenerator.start();
 
     waterMarkEventGenerator.track(streamId("s1"), 100);
diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java
index a5a2ccd..5c6be6e 100644
--- a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java
+++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java
@@ -26,6 +26,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import com.twitter.heron.api.Config;
 import com.twitter.heron.api.windowing.evictors.CountEvictionPolicy;
 import com.twitter.heron.api.windowing.evictors.TimeEvictionPolicy;
 import com.twitter.heron.api.windowing.evictors.WatermarkCountEvictionPolicy;
@@ -225,7 +226,7 @@
          * Set it to a large value and trigger manually.
           */
     TriggerPolicy<Integer, ?> triggerPolicy = new TimeTriggerPolicy<Integer>(Duration.ofDays(1)
-        .toMillis(), windowManager, evictionPolicy);
+        .toMillis(), windowManager, evictionPolicy, new Config());
     triggerPolicy.start();
     windowManager.setTriggerPolicy(triggerPolicy);
     long now = System.currentTimeMillis();
diff --git a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
index 069b6e2..d609787 100644
--- a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
@@ -18,11 +18,14 @@
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.common.basics.ByteAmount;
+import com.twitter.heron.streamlet.Config;
 import com.twitter.heron.streamlet.Context;
 import com.twitter.heron.streamlet.Resources;
 import com.twitter.heron.streamlet.SerializableTransformer;
@@ -260,15 +263,69 @@
 
   @Test
   public void testResourcesBuilder() {
-    Resources defaultResoures = Resources.defaultResources();
-    assertEquals(0, Float.compare(defaultResoures.getCpu(), 1.0f));
-    assertEquals(defaultResoures.getRam(), 104857600);
+    Resources defaultResources = Resources.defaultResources();
+    assertEquals(0, Float.compare(defaultResources.getCpu(), 1.0f));
+    assertEquals(defaultResources.getRam(), ByteAmount.fromMegabytes(100));
 
-    Resources res2 = new Resources.Builder()
+    Resources nonDefaultResources = new Resources.Builder()
         .setCpu(5.1f)
         .setRamInGB(20)
         .build();
-    assertEquals(0, Float.compare(res2.getCpu(), 5.1f));
-    assertEquals(res2.getRam(), 20 * 1024 * 1024);
+    assertEquals(0, Float.compare(nonDefaultResources.getCpu(), 5.1f));
+    assertEquals(nonDefaultResources.getRam(), ByteAmount.fromGigabytes(20));
   }
+
+  @Test
+  public void testConfigBuilder() {
+    Config defaultConfig = Config.defaultConfig();
+    assertEquals(defaultConfig.getDeliverySemantics(), Config.DeliverySemantics.ATMOST_ONCE);
+    assertEquals(defaultConfig.getNumContainers(), 1);
+    assertEquals(0, Float.compare(defaultConfig.getResources().getCpu(), 1.0f));
+    assertEquals(defaultConfig.getResources().getRam(), ByteAmount.fromMegabytes(100));
+    assertEquals(defaultConfig.getSerializer(), Config.Serializer.KRYO);
+
+    Resources nonDefaultResources = new Resources.Builder()
+        .setCpu(3.1f)
+        .setRamInMB(2500)
+        .build();
+
+    Config nonDefaultConfig = new Config.Builder()
+        .setContainerResources(nonDefaultResources)
+        .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
+        .setNumContainers(8)
+        .setTopologySerializer(Config.Serializer.JAVA)
+        .setUserConfig("key", "value")
+        .build();
+    assertEquals(nonDefaultConfig.getNumContainers(), 8);
+    assertEquals(nonDefaultConfig.getDeliverySemantics(),
+        Config.DeliverySemantics.EFFECTIVELY_ONCE);
+    assertEquals(0, Float.compare(nonDefaultConfig.getResources().getCpu(), 3.1f));
+    assertEquals(nonDefaultConfig.getResources().getRam(), ByteAmount.fromMegabytes(2500));
+    assertEquals(nonDefaultConfig.getSerializer(), Config.Serializer.JAVA);
+
+    Config multiSetConfig = new Config.Builder()
+        .setTopologySerializer(Config.Serializer.JAVA)
+        .setTopologySerializer(Config.Serializer.KRYO)
+        .build();
+    assertEquals(multiSetConfig.getSerializer(), Config.Serializer.KRYO);
+  }
+
+  @Test
+  public void testSetNameWithInvalidValues() {
+    Streamlet<Double> sample = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Function<String, Streamlet<Double>> function = sample::setName;
+    testByFunction(function, null);
+    testByFunction(function, "");
+    testByFunction(function, "  ");
+  }
+
+  private void testByFunction(Function<String, Streamlet<Double>> function, String sName) {
+    try {
+      function.apply(sName);
+      fail("Should have thrown an IllegalArgumentException because streamlet name is invalid");
+    } catch (IllegalArgumentException e) {
+      assertEquals("Streamlet name cannot be null/blank", e.getMessage());
+    }
+  }
+
 }
diff --git a/heron/config/src/yaml/conf/aurora/scheduler.yaml b/heron/config/src/yaml/conf/aurora/scheduler.yaml
index a4aab63..24d3f31 100644
--- a/heron/config/src/yaml/conf/aurora/scheduler.yaml
+++ b/heron/config/src/yaml/conf/aurora/scheduler.yaml
@@ -10,6 +10,15 @@
 # Invoke the IScheduler as a library directly
 heron.scheduler.is.service:                  False
 
+####################################################################
+# Following are Aurora-specific
+####################################################################
+# The maximum retry attempts when trying to kill an Aurora job
+heron.scheduler.job.max.kill.attempts:       5
+
+# The interval in ms between two retry-attempts to kill an Aurora job
+heron.scheduler.job.kill.retry.interval.ms:  2000
+
 # Aurora Controller Class
 # heron.class.scheduler.aurora.controller.cli:     False
 
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index d901d20..23f5fc3 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -43,19 +43,37 @@
 
 Log = log.Log
 
+# pylint: disable=too-many-lines
+
 def print_usage():
   print(
-      "Usage: ./heron-executor <shardid> <topname> <topid> <topdefnfile>"
-      " <state_manager_connection> <state_manager_root> <tmaster_binary> <stmgr_binary>"
-      " <metricsmgr_classpath> <instance_jvm_opts_in_base64> <classpath>"
-      " <master_port> <tmaster_controller_port> <tmaster_stats_port> <heron_internals_config_file>"
-      " <override_config_file> <component_ram_map> <component_jvm_opts_in_base64> <pkg_type>"
-      " <topology_bin_file> <heron_java_home> <shell-port> <heron_shell_binary> <metricsmgr_port>"
-      " <cluster> <role> <environ> <instance_classpath> <metrics_sinks_config_file>"
-      " <scheduler_classpath> <scheduler_port> <python_instance_binary>"
-      " <metricscachemgr_classpath> <metricscachemgr_masterport> <metricscachemgr_statsport>"
-      " <is_stateful> <ckptmgr_classpath> <ckptmgr_port> <stateful_config_file> "
-      " <healthmgr_mode> <healthmgr_classpath> <cpp_instance_binary>")
+      "Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
+      " --topology-id=<topid> --topology-defn-file=<topdefnfile>"
+      " --state-manager-connection=<state_manager_connection>"
+      " --state-manager-root=<state_manager_root> --tmaster-binary=<tmaster_binary>"
+      " --stmgr-binary=<stmgr_binary> --metrics-manager-classpath=<metricsmgr_classpath>"
+      " --instance-jvm-opts=<instance_jvm_opts_in_base64> --classpath=<classpath>"
+      " --master-port=<master_port> --tmaster-controller-port=<tmaster_controller_port>"
+      " --tmaster-stats-port=<tmaster_stats_port>"
+      " --heron-internals-config-file=<heron_internals_config_file>"
+      " --override-config-file=<override_config_file> --component-ram-map=<component_ram_map>"
+      " --component-jvm-opts=<component_jvm_opts_in_base64> --pkg-type=<pkg_type>"
+      " --topology-binary-file=<topology_bin_file> --heron-java-home=<heron_java_home>"
+      " --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
+      " --metrics-manager-port=<metricsmgr_port>"
+      " --cluster=<cluster> --role=<role> --environment=<environ>"
+      " --instance-classpath=<instance_classpath>"
+      " --metrics-sinks-config-file=<metrics_sinks_config_file>"
+      " --scheduler-classpath=<scheduler_classpath> --scheduler-port=<scheduler_port>"
+      " --python-instance-binary=<python_instance_binary>"
+      " --metricscache-manager-classpath=<metricscachemgr_classpath>"
+      " --metricscache-manager-master-port=<metricscachemgr_masterport>"
+      " --metricscache-manager-stats-port=<metricscachemgr_statsport>"
+      " --is-stateful=<is_stateful> --checkpoint-manager-classpath=<ckptmgr_classpath>"
+      " --checkpoint-manager-port=<ckptmgr_port> --stateful-config-file=<stateful_config_file>"
+      " --health-manager-mode=<healthmgr_mode> --health-manager-classpath=<healthmgr_classpath>"
+      " --cpp-instance-binary=<cpp_instance_binary>"
+      " --jvm-remote-debugger-ports=<comma_seperated_port_list>")
 
 def id_map(prefix, container_plans, add_zero_id=False):
   ids = {}
@@ -219,7 +237,9 @@
     self.health_manager_mode = parsed_args.health_manager_mode
     self.health_manager_classpath = '%s:%s'\
         % (self.scheduler_classpath, parsed_args.health_manager_classpath)
-
+    self.jvm_remote_debugger_ports = \
+      parsed_args.jvm_remote_debugger_ports.split(",") \
+        if parsed_args.jvm_remote_debugger_ports else None
 
   def __init__(self, args, shell_env):
     self.init_parsed_args(args)
@@ -293,6 +313,8 @@
     parser.add_argument("--stateful-config-file", required=True)
     parser.add_argument("--health-manager-mode", required=True)
     parser.add_argument("--health-manager-classpath", required=True)
+    parser.add_argument("--jvm-remote-debugger-ports", required=False,
+                        help="ports to be used by a remote debugger for JVM instances")
 
     parsed_args, unknown_args = parser.parse_known_args(args[1:])
 
@@ -508,6 +530,10 @@
             java_version.startswith("1.5"):
       java_metasize_param = 'PermSize'
 
+    if self.jvm_remote_debugger_ports and \
+            (len(instance_info) > len(self.jvm_remote_debugger_ports)):
+      Log.warn("Not enough remote debugger ports for all instances!")
+
     for (instance_id, component_name, global_task_id, component_index) in instance_info:
       total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
       heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
@@ -542,6 +568,12 @@
                       '-XX:ParallelGCThreads=4',
                       '-Xloggc:log-files/gc.%s.log' % instance_id]
 
+      remote_debugger_port = None
+      if self.jvm_remote_debugger_ports:
+        remote_debugger_port = self.jvm_remote_debugger_ports.pop()
+        instance_cmd.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
+                            % remote_debugger_port)
+
       instance_args = ['-topology_name', self.topology_name,
                        '-topology_id', self.topology_id,
                        '-instance_id', instance_id,
@@ -553,10 +585,13 @@
                        '-metricsmgr_port', self.metrics_manager_port,
                        '-system_config_file', self.heron_internals_config_file,
                        '-override_config_file', self.override_config_file]
+      if remote_debugger_port:
+        instance_args += ['-remote_debugger_port', remote_debugger_port]
 
       instance_cmd = instance_cmd + self.instance_jvm_opts.split()
       if component_name in self.component_jvm_opts:
         instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
+
       instance_cmd.extend(['-Djava.net.preferIPv4Stack=true',
                            '-cp',
                            '%s:%s' % (self.instance_classpath, self.classpath),
diff --git a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
index b8b5be6..7dc3aa0 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
@@ -201,15 +201,13 @@
     stmgrPortOption.setRequired(true);
     options.addOption(stmgrPortOption);
 
-    Option metricsmgrPortOption
-        = new Option(
+    Option metricsmgrPortOption = new Option(
         CommandLineOptions.METRICS_MGR_PORT_OPTION, true, "Metrics Manager Port");
     metricsmgrPortOption.setType(Integer.class);
     metricsmgrPortOption.setRequired(true);
     options.addOption(metricsmgrPortOption);
 
-    Option systemConfigFileOption
-        = new Option(
+    Option systemConfigFileOption = new Option(
         CommandLineOptions.SYSTEM_CONFIG_FILE, true, "Heron Internals Config Filename");
     systemConfigFileOption.setType(String.class);
     systemConfigFileOption.setRequired(true);
@@ -222,6 +220,11 @@
     overrideConfigFileOption.setRequired(true);
     options.addOption(overrideConfigFileOption);
 
+    Option remoteDebuggerPortOption = new Option(
+        CommandLineOptions.REMOTE_DEBUGGER_PORT, true, "Remote Debugger Port");
+    remoteDebuggerPortOption.setType(Integer.class);
+    options.addOption(remoteDebuggerPortOption);
+
     CommandLineParser parser = new DefaultParser();
     HelpFormatter formatter = new HelpFormatter();
     CommandLine cmd = null;
@@ -257,6 +260,12 @@
     String overrideConfigFile
         = commandLine.getOptionValue(CommandLineOptions.OVERRIDE_CONFIG_FILE);
 
+    Integer remoteDebuggerPort = null;
+    if (commandLine.hasOption(CommandLineOptions.REMOTE_DEBUGGER_PORT)) {
+      remoteDebuggerPort = Integer.parseInt(
+          commandLine.getOptionValue(CommandLineOptions.REMOTE_DEBUGGER_PORT));
+    }
+
     SystemConfig systemConfig = SystemConfig.newBuilder(true)
         .putAll(systemConfigFile, true)
         .putAll(overrideConfigFile, true)
@@ -266,8 +275,13 @@
     SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
 
     // Create the protobuf Instance
-    PhysicalPlans.InstanceInfo instanceInfo = PhysicalPlans.InstanceInfo.newBuilder().
-        setTaskId(taskId).setComponentIndex(componentIndex).setComponentName(componentName).build();
+    PhysicalPlans.InstanceInfo.Builder instanceInfoBuilder
+        = PhysicalPlans.InstanceInfo.newBuilder().setTaskId(taskId)
+        .setComponentIndex(componentIndex).setComponentName(componentName);
+    if (remoteDebuggerPort != null) {
+      instanceInfoBuilder.setRemoteDebuggerPort(remoteDebuggerPort);
+    }
+    PhysicalPlans.InstanceInfo instanceInfo = instanceInfoBuilder.build();
 
     PhysicalPlans.Instance instance = PhysicalPlans.Instance.newBuilder().
         setInstanceId(instanceId).setStmgrId(streamId).setInfo(instanceInfo).build();
@@ -285,11 +299,17 @@
             systemConfig.getHeronLoggingMaximumFiles()));
     LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());
 
-    LOG.info("\nStarting instance " + instanceId + " for topology " + topologyName
+    String logMsg = "\nStarting instance " + instanceId + " for topology " + topologyName
         + " and topologyId " + topologyId + " for component " + componentName
         + " with taskId " + taskId + " and componentIndex " + componentIndex
         + " and stmgrId " + streamId + " and stmgrPort " + streamPort
-        + " and metricsManagerPort " + metricsPort);
+        + " and metricsManagerPort " + metricsPort;
+
+    if (remoteDebuggerPort != null) {
+      logMsg += " and remoteDebuggerPort " + remoteDebuggerPort;
+    }
+
+    LOG.info(logMsg);
 
     LOG.info("System Config: " + systemConfig);
 
diff --git a/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java b/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
index 779771f..395c8c8 100644
--- a/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
+++ b/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -348,6 +349,10 @@
     LOG.info("Loops terminated. MetricsCache Manager exits.");
   }
 
+  /**
+   * start statemgr_client, metricscache_server, http_server
+   * @throws Exception
+   */
   public void start() throws Exception {
     // 1. Do prepare work
     // create an instance of state manager
@@ -367,18 +372,23 @@
       // initialize the statemgr
       statemgr.initialize(config);
 
-      statemgr.setMetricsCacheLocation(metricsCacheLocation, topologyName);
-      LOG.info("metricsCacheLocation " + metricsCacheLocation.toString());
-      LOG.info("topologyName " + topologyName.toString());
+      Boolean b = statemgr.setMetricsCacheLocation(metricsCacheLocation, topologyName)
+          .get(5000, TimeUnit.MILLISECONDS);
+      if (b != null && b) {
+        LOG.info("metricsCacheLocation " + metricsCacheLocation.toString());
+        LOG.info("topologyName " + topologyName.toString());
 
-      LOG.info("Starting Metrics Cache HTTP Server");
-      metricsCacheManagerHttpServer.start();
+        LOG.info("Starting Metrics Cache HTTP Server");
+        metricsCacheManagerHttpServer.start();
 
-      // 2. The MetricsCacheServer would run in the main thread
-      // We do it in the final step since it would await the main thread
-      LOG.info("Starting Metrics Cache Server");
-      metricsCacheManagerServer.start();
-      metricsCacheManagerServerLoop.loop();
+        // 2. The MetricsCacheServer would run in the main thread
+        // We do it in the final step since it would await the main thread
+        LOG.info("Starting Metrics Cache Server");
+        metricsCacheManagerServer.start();
+        metricsCacheManagerServerLoop.loop();
+      } else {
+        throw new RuntimeException("Failed to set metricscahe location.");
+      }
     } finally {
       // 3. Do post work basing on the result
       // Currently nothing to do here
diff --git a/heron/proto/physical_plan.proto b/heron/proto/physical_plan.proto
index cc72fe4..e11af00 100644
--- a/heron/proto/physical_plan.proto
+++ b/heron/proto/physical_plan.proto
@@ -33,6 +33,8 @@
   required int32 component_index = 2; // specific to this component
   required string component_name = 3;
   repeated string params = 4;
+  // the port a remote debugger can be attached to for this instance.  Currently JVM instances only
+  optional int32 remote_debugger_port = 5;
 }
 
 message Instance {
diff --git a/heron/scheduler-core/src/java/BUILD b/heron/scheduler-core/src/java/BUILD
index 31598b9..c8b7545 100644
--- a/heron/scheduler-core/src/java/BUILD
+++ b/heron/scheduler-core/src/java/BUILD
@@ -9,6 +9,7 @@
    "//heron/api/src/java:classification",
    "@commons_cli_commons_cli//jar",
    "@com_google_guava_guava//jar",
+   "@org_apache_commons_commons_lang3//jar",
 ]
 
 spi_deps_files = [
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
index 0d5f416..0f7b890 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
@@ -56,7 +56,8 @@
   CheckpointManagerPort("checkpoint-manager-port"),
   StatefulConfigFile("stateful-config-file"),
   HealthManagerMode("health-manager-mode"),
-  HealthManagerClasspath("health-manager-classpath");
+  HealthManagerClasspath("health-manager-classpath"),
+  JvmRemoteDebuggerPorts("jvm-remote-debugger-ports");
 
   private final String name;
 
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java
index fe4f366..20de1ee 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java
@@ -150,7 +150,8 @@
 
     if (!schedulerClient.killTopology(killTopologyRequest)) {
       throw new TopologyRuntimeManagementException(
-          String.format("Failed to kill topology '%s' with scheduler", topologyName));
+          String.format("Failed to kill topology '%s' with scheduler, "
+              + "please re-try the kill command later", topologyName));
     }
 
     // clean up the state of the topology in state manager
@@ -176,7 +177,7 @@
     if (!changeDetected(currentPlan, changeRequests)) {
       throw new TopologyRuntimeManagementException(
           String.format("The component parallelism request (%s) is the same as the "
-          + "current topology parallelism. Not taking action.", newParallelism));
+              + "current topology parallelism. Not taking action.", newParallelism));
     }
 
     PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
@@ -199,7 +200,7 @@
     if (!schedulerClient.updateTopology(updateTopologyRequest)) {
       throw new TopologyRuntimeManagementException(String.format(
           "Failed to update topology with Scheduler, updateTopologyRequest="
-          + updateTopologyRequest));
+              + updateTopologyRequest));
     }
 
     // Clean the connection when we are done.
@@ -242,25 +243,25 @@
     result = statemgr.deletePhysicalPlan(topologyName);
     if (result == null || !result) {
       throw new TopologyRuntimeManagementException(
-        "Failed to clear physical plan. Check whether TMaster set it correctly.");
+          "Failed to clear physical plan. Check whether TMaster set it correctly.");
     }
 
     result = statemgr.deleteSchedulerLocation(topologyName);
     if (result == null || !result) {
       throw new TopologyRuntimeManagementException(
-        "Failed to clear scheduler location. Check whether Scheduler set it correctly.");
+          "Failed to clear scheduler location. Check whether Scheduler set it correctly.");
     }
 
     result = statemgr.deleteLocks(topologyName);
     if (result == null || !result) {
       throw new TopologyRuntimeManagementException(
-        "Failed to delete locks. It's possible that the topology never created any.");
+          "Failed to delete locks. It's possible that the topology never created any.");
     }
 
     result = statemgr.deleteExecutionState(topologyName);
     if (result == null || !result) {
       throw new TopologyRuntimeManagementException(
-        "Failed to clear execution state");
+          "Failed to clear execution state");
     }
 
     // Set topology def at last since we determine whether a topology is running
@@ -268,7 +269,7 @@
     result = statemgr.deleteTopology(topologyName);
     if (result == null || !result) {
       throw new TopologyRuntimeManagementException(
-        "Failed to clear topology definition");
+          "Failed to clear topology definition");
     }
 
     LOG.fine("Cleaned up topology state");
@@ -346,7 +347,7 @@
   }
 
   private static boolean changeDetected(PackingPlans.PackingPlan currentProtoPlan,
-                                 Map<String, Integer> changeRequests) {
+                                        Map<String, Integer> changeRequests) {
     PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
     PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
     for (String component : changeRequests.keySet()) {
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
index ea89870..358604a 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
@@ -59,7 +59,8 @@
     SCHEDULER_PORT("scheduler", true),
     METRICS_CACHE_MASTER_PORT("metrics-cache-m", true),
     METRICS_CACHE_STATS_PORT("metrics-cache-s", true),
-    CHECKPOINT_MANAGER_PORT("ckptmgr", true);
+    CHECKPOINT_MANAGER_PORT("ckptmgr", true),
+    JVM_REMOTE_DEBUGGER_PORTS("jvm-remote-debugger", false);
 
     private final String name;
     private final boolean required;
@@ -230,6 +231,9 @@
         ExecutorPort.METRICS_CACHE_STATS_PORT, ports);
     String ckptmgrPort = ExecutorPort.getPort(
         ExecutorPort.CHECKPOINT_MANAGER_PORT, ports);
+    String remoteDebuggerPorts = ExecutorPort.getPort(
+        ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, ports
+    );
 
     List<String> commands = new ArrayList<>();
     commands.add(createCommandArg(ExecutorFlag.TopologyName, topology.getName()));
@@ -310,6 +314,9 @@
     commands.add(createCommandArg(ExecutorFlag.HealthManagerMode, healthMgrMode));
     commands.add(createCommandArg(ExecutorFlag.HealthManagerClasspath,
         Context.healthMgrClassPath(config)));
+    if (remoteDebuggerPorts != null) {
+      commands.add(createCommandArg(ExecutorFlag.JvmRemoteDebuggerPorts, remoteDebuggerPorts));
+    }
 
     return commands.toArray(new String[commands.size()]);
   }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java
index 02cd2f3..4c1bcf2 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java
@@ -22,6 +22,9 @@
 public final class AuroraContext extends Context {
   public static final String JOB_LINK_TEMPLATE = "heron.scheduler.job.link.template";
   public static final String JOB_TEMPLATE = "heron.scheduler.job.template";
+  public static final String JOB_MAX_KILL_ATTEMPTS = "heron.scheduler.job.max.kill.attempts";
+  public static final String JOB_KILL_RETRY_INTERVAL_MS =
+      "heron.scheduler.job.kill.retry.interval.ms";
 
   private AuroraContext() {
   }
@@ -34,4 +37,12 @@
     return config.getStringValue(JOB_TEMPLATE,
         new File(Context.heronConf(config), "heron.aurora").getPath());
   }
+
+  public static int getJobMaxKillAttempts(Config config) {
+    return config.getIntegerValue(JOB_MAX_KILL_ATTEMPTS, 5);
+  }
+
+  public static long getJobKillRetryIntervalMs(Config config) {
+    return config.getLongValue(JOB_KILL_RETRY_INTERVAL_MS, 2000);
+  }
 }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
index 9e4ccaf..daf28c1 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
@@ -69,12 +69,22 @@
     return cliController.killJob();
   }
 
+  private StMgr searchContainer(Integer id) {
+    String prefix = "stmgr-" + id;
+    for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) {
+      if (sm.getId().equals(prefix)) {
+        return sm;
+      }
+    }
+    return null;
+  }
+
   // Restart an aurora container
   @Override
   public boolean restart(Integer containerId) {
     // there is no backpressure for container 0, delegate to aurora client
     if (containerId == null || containerId == 0) {
-      cliController.restart(containerId);
+      return cliController.restart(containerId);
     }
 
     if (stateMgrAdaptor == null) {
@@ -82,18 +92,24 @@
       return false;
     }
 
-    int index = containerId - 1; // stmgr container starts from 1
-    StMgr contaienrInfo = stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrs(index);
-    String host = contaienrInfo.getHostName();
-    int port = contaienrInfo.getShellPort();
-    String url = "http://" + host + ":" + port + "/killexecutor";
+    StMgr sm = searchContainer(containerId);
+    if (sm == null) {
+      LOG.warning("container not found in pplan " + containerId);
+      return false;
+    }
+
+    String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor";
     String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId();
     LOG.info("sending `kill container` to " + url + "; payload: " + payload);
 
     HttpURLConnection con = NetworkUtils.getHttpConnection(url);
     try {
-      NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes());
-      return NetworkUtils.checkHttpResponseCode(con, 200);
+      if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) {
+        return NetworkUtils.checkHttpResponseCode(con, 200);
+      } else { // if heron-shell command fails, delegate to aurora client
+        LOG.info("heron-shell killexecutor failed; try aurora client ..");
+        return cliController.restart(containerId);
+      }
     } finally {
       con.disconnect();
     }
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
index 914a797..652f829 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
@@ -30,6 +30,7 @@
 
 import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.api.utils.TopologyUtils;
+import com.twitter.heron.api.utils.Utils;
 import com.twitter.heron.common.basics.FileUtils;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.UpdateTopologyManager;
@@ -141,7 +142,27 @@
 
   @Override
   public boolean onKill(Scheduler.KillTopologyRequest request) {
-    return controller.killJob();
+    // The aurora service can be unavailable or unstable for a while,
+    // we will try to kill the job with multiple attempts
+    int attempts = AuroraContext.getJobMaxKillAttempts(config);
+    long retryIntervalMs = AuroraContext.getJobKillRetryIntervalMs(config);
+    LOG.info("Will try " + attempts + " attempts at interval: " + retryIntervalMs + " ms");
+
+    // First attempt
+    boolean res = controller.killJob();
+    attempts--;
+
+    // Failure retry
+    while (!res && attempts > 0) {
+      LOG.warning("Failed to kill the topology. Will retry in " + retryIntervalMs + " ms...");
+      Utils.sleep(retryIntervalMs);
+
+      // Retry the killJob()
+      res = controller.killJob();
+      attempts--;
+    }
+
+    return res;
   }
 
   @Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
index c8cb637..7799a77 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -73,6 +73,9 @@
   public static final String METRICS_CACHE_MASTER_PORT = "6007";
   public static final String METRICS_CACHE_STATS_PORT = "6008";
   public static final String CHECKPOINT_MGR_PORT = "6009";
+  // port number the start with when more than one port needed for remote debugging
+  public static final String JVM_REMOTE_DEBUGGER_PORT = "6010";
+  public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "remote-debugger";
 
   public static final Map<ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
   static {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
index dc436b6..21da118 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -15,6 +15,7 @@
 package com.twitter.heron.scheduler.kubernetes;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -23,6 +24,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,6 +34,7 @@
 import com.google.common.base.Optional;
 import com.google.common.primitives.Ints;
 
+import com.twitter.heron.api.utils.TopologyUtils;
 import com.twitter.heron.common.basics.FileUtils;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.TopologyRuntimeManagementException;
@@ -165,11 +168,17 @@
 
     String[] deploymentConfs =
         new String[Ints.checkedCast(Runtime.numContainers(runtimeConfiguration))];
+
     for (int i = 0; i < Runtime.numContainers(runtimeConfiguration); i++) {
-
-      deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource);
+      Optional<PackingPlan.ContainerPlan> container = packing.getContainer(i);
+      Set<PackingPlan.InstancePlan> instancePlans;
+      if (container.isPresent()) {
+        instancePlans = container.get().getInstances();
+      } else {
+        instancePlans = new HashSet<>();
+      }
+      deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource, instancePlans);
     }
-
     return deploymentConfs;
   }
 
@@ -183,7 +192,8 @@
    */
   protected String buildKubernetesPodSpec(ObjectMapper mapper,
                                           Integer containerIndex,
-                                          Resource containerResource) {
+                                          Resource containerResource,
+                                          Set<PackingPlan.InstancePlan> instancePlans) {
     ObjectNode instance = mapper.createObjectNode();
 
     instance.put(KubernetesConstants.API_VERSION, KubernetesConstants.API_VERSION_1);
@@ -192,7 +202,8 @@
 
     instance.set(KubernetesConstants.API_SPEC, getContainerSpec(mapper,
         containerIndex,
-        containerResource));
+        containerResource,
+        instancePlans));
 
     return instance.toString();
   }
@@ -304,7 +315,7 @@
       setImagePullPolicyIfPresent(containerInfo);
 
       // Port info -- all the same
-      containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+      containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, containerPlan.getInstances()));
 
       // In order for the container to run with the correct index, we're copying the base
       // configuration for container with index 0, and replacing the container index with
@@ -350,7 +361,8 @@
    */
   protected ObjectNode getContainerSpec(ObjectMapper mapper,
                                         int containerIndex,
-                                        Resource containerResource) {
+                                        Resource containerResource,
+                                        Set<PackingPlan.InstancePlan> instancePlans) {
 
     ObjectNode containerSpec = mapper.createObjectNode();
     ArrayNode containerList = mapper.createArrayNode();
@@ -382,10 +394,10 @@
     setImagePullPolicyIfPresent(containerInfo);
 
     // Port information for this container
-    containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+    containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, instancePlans));
 
     // Heron command for the container
-    String[] command = getExecutorCommand(containerIndex);
+    String[] command = getExecutorCommand(containerIndex, instancePlans.size());
     ArrayNode commandsArray = mapper.createArrayNode();
     for (int i = 0; i < command.length; i++) {
       commandsArray.add(command[i]);
@@ -410,12 +422,22 @@
     return containerSpec;
   }
 
+  private List<Integer> getRemoteDebuggerPorts(int numberOfInstances) {
+    List<Integer> ports = new LinkedList<>();
+    for (int i = 0; i < numberOfInstances; i++) {
+      ports.add(Integer.parseInt(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT, 10) + i);
+    }
+
+    return ports;
+  }
+
   /**
    * Get the ports the container will need to expose so other containers can access its services
    *
    * @param mapper
    */
-  protected ArrayNode getPorts(ObjectMapper mapper) {
+  protected ArrayNode getPorts(ObjectMapper mapper,
+                               Set<PackingPlan.InstancePlan> instancePlans) {
     ArrayNode ports = mapper.createArrayNode();
 
     for (Map.Entry<ExecutorPort, String> entry
@@ -428,6 +450,18 @@
       ports.add(port);
     }
 
+    // if remote debugger enabled
+    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+      List<Integer> portsForRemoteDebugging = getRemoteDebuggerPorts(instancePlans.size());
+
+      for (int i = 0; i < portsForRemoteDebugging.size(); i++) {
+        ObjectNode port = mapper.createObjectNode();
+        port.put(KubernetesConstants.DOCKER_CONTAINER_PORT, portsForRemoteDebugging.get(i));
+        port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME
+            + "-" + String.valueOf(i));
+      }
+    }
+
     return ports;
   }
 
@@ -452,10 +486,17 @@
    *
    * @param containerIndex
    */
-  protected String[] getExecutorCommand(int containerIndex) {
+  protected String[] getExecutorCommand(int containerIndex, int numInstances) {
+    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+      KubernetesConstants.EXECUTOR_PORTS.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+          String.join(",", getRemoteDebuggerPorts(numInstances)
+              .stream().map(Object::toString)
+              .collect(Collectors.toList())));
+    }
     String[] executorCommand =
         SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
         containerIndex, KubernetesConstants.EXECUTOR_PORTS);
+
     String[] command = {
         "sh",
         "-c",
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index fb98bf2..7b598ce 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -32,9 +32,11 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
+import com.twitter.heron.api.utils.TopologyUtils;
 import com.twitter.heron.common.basics.SysUtils;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.UpdateTopologyManager;
+import com.twitter.heron.scheduler.utils.Runtime;
 import com.twitter.heron.scheduler.utils.SchedulerUtils;
 import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import com.twitter.heron.spi.common.Config;
@@ -80,9 +82,9 @@
    * Start executor process via running an async shell process
    */
   @VisibleForTesting
-  protected Process startExecutorProcess(int container) {
+  protected Process startExecutorProcess(int container, Set<PackingPlan.InstancePlan> instances) {
     return ShellUtils.runASyncProcess(
-        getExecutorCommand(container),
+        getExecutorCommand(container, instances),
         new File(LocalContext.workingDirectory(config)),
         Integer.toString(container));
   }
@@ -91,25 +93,27 @@
    * Start the executor for the given container
    */
   @VisibleForTesting
-  protected void startExecutor(final int container) {
+  protected void startExecutor(final int container, Set<PackingPlan.InstancePlan> instances) {
     LOG.info("Starting a new executor for container: " + container);
 
     // create a process with the executor command and topology working directory
-    final Process containerExecutor = startExecutorProcess(container);
+    final Process containerExecutor = startExecutorProcess(container, instances);
 
     // associate the process and its container id
     processToContainer.put(containerExecutor, container);
     LOG.info("Started the executor for container: " + container);
 
     // add the container for monitoring
-    startExecutorMonitor(container, containerExecutor);
+    startExecutorMonitor(container, containerExecutor, instances);
   }
 
   /**
    * Start the monitor of a given executor
    */
   @VisibleForTesting
-  protected void startExecutorMonitor(final int container, final Process containerExecutor) {
+  protected void startExecutorMonitor(final int container,
+                                      final Process containerExecutor,
+                                      Set<PackingPlan.InstancePlan> instances) {
     // add the container for monitoring
     Runnable r = new Runnable() {
       @Override
@@ -130,7 +134,7 @@
           }
           LOG.log(Level.INFO, "Trying to restart container {0}", container);
           // restart the container
-          startExecutor(processToContainer.remove(containerExecutor));
+          startExecutor(processToContainer.remove(containerExecutor), instances);
         } catch (InterruptedException e) {
           if (!isTopologyKilled) {
             LOG.log(Level.SEVERE, "Process is interrupted: ", e);
@@ -142,7 +146,8 @@
     monitorService.submit(r);
   }
 
-  private String[] getExecutorCommand(int container) {
+
+  private String[] getExecutorCommand(int container,  Set<PackingPlan.InstancePlan> instances) {
     Map<ExecutorPort, String> ports = new HashMap<>();
     for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
       int port = SysUtils.getFreePort();
@@ -152,8 +157,22 @@
       ports.put(executorPort, String.valueOf(port));
     }
 
-    String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
+    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtime))
+        && instances != null) {
+      List<String> remoteDebuggingPorts = new LinkedList<>();
+      int portsForRemoteDebugging = instances.size();
+      for (int i = 0; i < portsForRemoteDebugging; i++) {
+        int port = SysUtils.getFreePort();
+        if (port == -1) {
+          throw new RuntimeException("Failed to find available ports for executor");
+        }
+        remoteDebuggingPorts.add(String.valueOf(port));
+      }
+      ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+          String.join(",", remoteDebuggingPorts));
+    }
 
+    String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
     LOG.info("Executor command line: " + Arrays.toString(executorCmd));
     return executorCmd;
   }
@@ -167,11 +186,11 @@
 
     synchronized (processToContainer) {
       LOG.info("Starting executor for TMaster");
-      startExecutor(0);
+      startExecutor(0, null);
 
       // for each container, run its own executor
       for (PackingPlan.ContainerPlan container : packing.getContainers()) {
-        startExecutor(container.getId());
+        startExecutor(container.getId(), container.getInstances());
       }
     }
 
@@ -276,7 +295,7 @@
           throw new RuntimeException(String.format("Found active container for %s, "
               + "cannot launch a duplicate container.", container.getId()));
         }
-        startExecutor(container.getId());
+        startExecutor(container.getId(), container.getInstances());
       }
     }
   }
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
index 1e06d6d..5940d8b 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
@@ -69,7 +69,7 @@
   public static void beforeClass() throws Exception {
     scheduler = Mockito.spy(KubernetesScheduler.class);
     Mockito.doReturn(EXECUTOR_CMD).when(scheduler)
-        .getExecutorCommand(Mockito.anyInt());
+        .getExecutorCommand(Mockito.anyInt(), Mockito.anyInt());
   }
 
   @AfterClass
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
index 484ad8f..badaaa9 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
@@ -25,13 +25,14 @@
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Key;
 import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.utils.PackingTestUtils;
 
-
+@SuppressWarnings("unchecked")
 public class LocalSchedulerTest {
   private static final String TOPOLOGY_NAME = "testTopology";
   private static final int MAX_WAITING_SECOND = 10;
@@ -46,6 +47,19 @@
     Mockito.when(config.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
 
     runtime = Mockito.mock(Config.class);
+
+    scheduler.initialize(config, runtime);
+    Mockito.when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(TopologyAPI.Topology
+        .newBuilder()
+        .setId("a")
+        .setName("a")
+        .setState(TopologyAPI.TopologyState.RUNNING)
+        .setTopologyConfig(
+            TopologyAPI.Config.newBuilder()
+                .addKvs(TopologyAPI.Config.KeyValue.newBuilder()
+                    .setKey(com.twitter.heron.api.Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE)
+                    .setValue("false"))).build());
+
     scheduler.initialize(config, runtime);
   }
 
@@ -71,11 +85,14 @@
   @Test
   public void testOnSchedule() throws Exception {
     Mockito.doNothing().
-        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+        Mockito.anySet());
     Process[] mockProcesses = new Process[4];
     for (int i = 0; i < 4; i++) {
       mockProcesses[i] = Mockito.mock(Process.class);
-      Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i);
+      Set<PackingPlan.InstancePlan> instances
+          = (i == 0) ?  null : PackingTestUtils.testContainerPlan(i).getInstances();
+      Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i, instances);
     }
 
     PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
@@ -92,9 +109,12 @@
         // id 2 was not in the container plan
         continue;
       }
-      Mockito.verify(scheduler).startExecutor(i);
-      Mockito.verify(scheduler).startExecutorProcess(i);
-      Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i]);
+
+      Set<PackingPlan.InstancePlan> instances
+          = (i == 0) ?  null : PackingTestUtils.testContainerPlan(i).getInstances();
+      Mockito.verify(scheduler).startExecutor(i, instances);
+      Mockito.verify(scheduler).startExecutorProcess(i, instances);
+      Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i], instances);
     }
   }
 
@@ -105,12 +125,15 @@
 
     //verify plan is deployed and containers are created
     Mockito.doNothing().
-        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+        Mockito.anySet());
     Process mockProcessTM = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(0);
+    Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(
+        0, null);
 
     Process mockProcessWorker1 = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(1);
+    Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(
+        1,  PackingTestUtils.testContainerPlan(1).getInstances());
 
     PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
     Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
@@ -118,24 +141,30 @@
     Mockito.when(packingPlan.getContainers()).thenReturn(containers);
     Assert.assertTrue(scheduler.onSchedule(packingPlan));
 
-    Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt(),
+        Mockito.anySet());
 
     //now verify add container adds new container
     Process mockProcessWorker2 = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(3);
+    Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(
+        3, PackingTestUtils.testContainerPlan(3).getInstances());
     containers.clear();
     containers.add(PackingTestUtils.testContainerPlan(3));
     scheduler.addContainers(containers);
-    Mockito.verify(scheduler).startExecutor(3);
+    Mockito.verify(scheduler).startExecutor(3,
+        PackingTestUtils.testContainerPlan(3).getInstances());
 
     Process mockProcess = Mockito.mock(Process.class);
-    Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(Mockito.anyInt());
+    Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(
+        Mockito.anyInt(), Mockito.anySet());
     containers.clear();
     containers.add(PackingTestUtils.testContainerPlan(4));
     containers.add(PackingTestUtils.testContainerPlan(5));
     scheduler.addContainers(containers);
-    Mockito.verify(scheduler).startExecutor(4);
-    Mockito.verify(scheduler).startExecutor(5);
+    Mockito.verify(scheduler).startExecutor(4,
+        PackingTestUtils.testContainerPlan(4).getInstances());
+    Mockito.verify(scheduler).startExecutor(5,
+        PackingTestUtils.testContainerPlan(5).getInstances());
   }
 
   /**
@@ -147,13 +176,17 @@
 
     //verify plan is deployed and containers are created
     Mockito.doNothing().
-        when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+        when(scheduler).startExecutorMonitor(Mockito.anyInt(),
+        Mockito.any(Process.class), Mockito.anySet());
 
     Process[] processes = new Process[LOCAL_NUM_CONTAINER];
     Set<PackingPlan.ContainerPlan> existingContainers = new HashSet<>();
     for (int i = 0; i < LOCAL_NUM_CONTAINER; i++) {
       processes[i] = Mockito.mock(Process.class);
-      Mockito.doReturn(processes[i]).when(scheduler).startExecutorProcess(i);
+      Set<PackingPlan.InstancePlan> instances
+          = (i == 0) ?  null : PackingTestUtils.testContainerPlan(i).getInstances();
+      Mockito.doReturn(processes[i]).when(scheduler)
+          .startExecutorProcess(i,  instances);
       if (i > 0) {
         // ignore the container for TMaster. existing containers simulate the containers created
         // by packing plan
@@ -165,7 +198,8 @@
     Mockito.when(packingPlan.getContainers()).thenReturn(existingContainers);
     Assert.assertTrue(scheduler.onSchedule(packingPlan));
     verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4, 5);
-    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
 
     Set<PackingPlan.ContainerPlan> containersToRemove = new HashSet<>();
     PackingPlan.ContainerPlan containerToRemove =
@@ -175,7 +209,8 @@
     verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4);
     Mockito.verify(processes[LOCAL_NUM_CONTAINER - 1]).destroy();
     // verify no new process restarts
-    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
 
     containersToRemove.clear();
     containersToRemove.add(PackingTestUtils.testContainerPlan(1));
@@ -185,7 +220,8 @@
     Mockito.verify(processes[1]).destroy();
     Mockito.verify(processes[2]).destroy();
     // verify no new process restarts
-    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
   }
 
   private void verifyIdsOfLaunchedContainers(int... ids) {
@@ -249,17 +285,21 @@
     int exitValue = 1;
     Process containerExecutor = Mockito.mock(Process.class);
     Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
-    Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+    Mockito.doNothing().when(scheduler).startExecutor(
+        Mockito.anyInt(), Mockito.anySet());
 
     // Start the process
     scheduler.getProcessToContainer().put(containerExecutor, containerId);
-    scheduler.startExecutorMonitor(containerId, containerExecutor);
+    scheduler.startExecutorMonitor(
+        containerId, containerExecutor,
+        PackingTestUtils.testContainerPlan(containerId).getInstances());
 
     // Shut down the MonitorService
     scheduler.getMonitorService().shutdown();
     scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
     // The dead process should be restarted
-    Mockito.verify(scheduler).startExecutor(containerId);
+    Mockito.verify(scheduler).startExecutor(containerId,
+        PackingTestUtils.testContainerPlan(containerId).getInstances());
     Assert.assertFalse(scheduler.isTopologyKilled());
   }
 
@@ -270,20 +310,22 @@
 
     Process containerExecutor = Mockito.mock(Process.class);
     Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
-    Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+    Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt(), Mockito.anySet());
 
     // Set the killed flag and the dead process should not be restarted
     scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance());
 
     // Start the process
     scheduler.getProcessToContainer().put(containerExecutor, containerId);
-    scheduler.startExecutorMonitor(containerId, containerExecutor);
+    scheduler.startExecutorMonitor(containerId, containerExecutor,
+        PackingTestUtils.testContainerPlan(containerId).getInstances());
 
     // Shut down the MonitorService
     scheduler.getMonitorService().shutdown();
     scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
     // The dead process should not be restarted
-    Mockito.verify(scheduler, Mockito.never()).startExecutor(Mockito.anyInt());
+    Mockito.verify(scheduler, Mockito.never())
+        .startExecutor(Mockito.anyInt(), Mockito.anySet());
     Assert.assertTrue(scheduler.isTopologyKilled());
   }
 }
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
index ee3695f..14a4d60 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
@@ -23,15 +23,12 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 public class ShellUtilsTest {
 
-  private static final Logger LOG = Logger.getLogger(ShellUtilsTest.class.getName());
-
   private static String generateRandomLongString(int size) {
     StringBuilder builder = new StringBuilder();
     Random random = new Random();
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
index af22c36..54694e9 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
@@ -14,12 +14,19 @@
 
 package com.twitter.heron.spi.utils;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
 
 public class UploaderUtilsTest {
 
@@ -55,4 +62,42 @@
         UploaderUtils.generateFilename(topologyName, role, tag, version, extension);
     Assert.assertTrue(customizedFilename.endsWith(extension));
   }
+
+  @Test
+  public void testCopyToOutputStream() throws Exception {
+    String fileContent = "temp file test content";
+    String prefix = "myTestFile";
+    String suffix = ".tmp";
+    File tempFile = null;
+    try {
+      // create temp file
+      tempFile = File.createTempFile(prefix, suffix);
+
+      // write content to temp file
+      writeContentToFile(tempFile.getAbsolutePath(), fileContent);
+
+      // copy file content to output stream
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      UploaderUtils.copyToOutputStream(tempFile.getAbsolutePath(), out);
+      Assert.assertEquals(fileContent, new String(out.toByteArray()));
+    } finally {
+      if (tempFile != null) {
+        tempFile.deleteOnExit();
+      }
+    }
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testCopyToOutputStreamWithInvalidFile() throws Exception {
+    UploaderUtils.copyToOutputStream("invalid_file_name", new ByteArrayOutputStream());
+  }
+
+  private void writeContentToFile(String fileName, String content) {
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(fileName))) {
+      bw.write(content);
+    } catch (IOException e) {
+      fail("Unexpected IOException has been thrown so unit test fails. Error message: "
+          + e.getMessage());
+    }
+  }
 }
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index acb58eb..8bbfe30 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -31,6 +31,8 @@
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.DeleteBuilder;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -337,6 +339,15 @@
   public ListenableFuture<Boolean> setMetricsCacheLocation(
       TopologyMaster.MetricsCacheLocation location,
       String topologyName) {
+    client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
+      @Override
+      public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
+        if (arg1 != ConnectionState.CONNECTED) {
+          // if not the first time successful connection, fail fast
+          throw new RuntimeException("Unexpected state change to: " + arg1.name());
+        }
+      }
+    });
     return createNode(
         StateLocation.METRICSCACHE_LOCATION, topologyName, location.toByteArray(), true);
   }
@@ -426,6 +437,7 @@
     Config config = Config.newBuilder()
         .put(Key.STATEMGR_ROOT_PATH, "/storm/heron/states")
         .put(Key.STATEMGR_CONNECTION_STRING, zookeeperHostname)
+        .put(Key.SCHEDULER_IS_SERVICE, false)
         .build();
     CuratorStateManager stateManager = new CuratorStateManager();
     stateManager.doMain(args, config);
diff --git a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java
index 7fc0db8..2d09221 100644
--- a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java
+++ b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java
@@ -18,8 +18,10 @@
 import com.twitter.heron.spi.common.Context;
 
 public class LocalFileSystemContext extends Context {
-  public static String fileSystemDirectory(Config config) {
+
+  public static String getFileSystemDirectory(Config config) {
     return config.getStringValue(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(),
         LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString());
   }
+
 }
diff --git a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java
index e4ddf69..a7cbebc 100644
--- a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java
+++ b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java
@@ -43,7 +43,7 @@
   public void initialize(Config ipconfig) {
     this.config = ipconfig;
 
-    this.destTopologyDirectory = LocalFileSystemContext.fileSystemDirectory(config);
+    this.destTopologyDirectory = LocalFileSystemContext.getFileSystemDirectory(config);
 
     // name of the destination file is the same as the base name of the topology package file
     String fileName =
diff --git a/heron/uploaders/tests/java/BUILD b/heron/uploaders/tests/java/BUILD
index 5e50861..c94b1c3 100644
--- a/heron/uploaders/tests/java/BUILD
+++ b/heron/uploaders/tests/java/BUILD
@@ -49,6 +49,7 @@
 java_tests(
   test_classes = [
     "com.twitter.heron.uploader.localfs.LocalFileSystemConfigTest",
+    "com.twitter.heron.uploader.localfs.LocalFileSystemContextTest",
     "com.twitter.heron.uploader.localfs.LocalFileSystemUploaderTest",
   ],
   runtime_deps = [ ":localfs-tests" ],
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java
index 7209551..5806c2e 100644
--- a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java
@@ -43,7 +43,7 @@
     Config config = Config.toLocalMode(getDefaultConfig());
 
     Assert.assertEquals(
-        LocalFileSystemContext.fileSystemDirectory(config),
+        LocalFileSystemContext.getFileSystemDirectory(config),
         TokenSub.substitute(config, LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString())
     );
   }
@@ -59,7 +59,7 @@
             .build());
 
     Assert.assertEquals(
-        LocalFileSystemContext.fileSystemDirectory(config),
+        LocalFileSystemContext.getFileSystemDirectory(config),
         overrideDirectory
     );
   }
@@ -84,7 +84,7 @@
 
     Assert.assertEquals(
         Paths.get(uploader.getTopologyFile()).getParent().toString(),
-        LocalFileSystemContext.fileSystemDirectory(config)
+        LocalFileSystemContext.getFileSystemDirectory(config)
     );
   }
 
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemContextTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemContextTest.java
new file mode 100644
index 0000000..6ee51ee
--- /dev/null
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemContextTest.java
@@ -0,0 +1,62 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.uploader.localfs;
+
+import java.nio.file.Paths;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Key;
+
+public class LocalFileSystemContextTest {
+
+  @Test
+  public void testGetDefaultFileSystemDirectory() {
+    Config config = Config.newBuilder()
+        .put(Key.CLUSTER, "cluster")
+        .put(Key.ROLE, "role")
+        .put(Key.TOPOLOGY_NAME, "topology")
+        .build();
+
+    String actualFileSystemDirectory = LocalFileSystemContext.getFileSystemDirectory(config);
+
+    // get default file system directory
+    String defaultFileSystemDirectory = Paths.get("${HOME}",
+        ".herondata", "repository", "${CLUSTER}", "${ROLE}", "${TOPOLOGY}").toString();
+
+    Assert.assertEquals(defaultFileSystemDirectory, actualFileSystemDirectory);
+  }
+
+  @Test
+  public void testGetFileSystemDirectory() {
+    String fileSystemDirectory = "${HOME}/.herondata/topologies/${CLUSTER}";
+
+    Config config = Config.newBuilder()
+        .put(Key.CLUSTER, "cluster")
+        .put(Key.ROLE, "role")
+        .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), fileSystemDirectory)
+        .build();
+
+    // get actual file system directory set by the user
+    String actualFileSystemDirectory = LocalFileSystemContext.getFileSystemDirectory(config);
+
+    String expectedFileSystemDirectory = Paths.get("${HOME}",
+        ".herondata", "topologies", "${CLUSTER}").toString();
+
+    Assert.assertEquals(expectedFileSystemDirectory, actualFileSystemDirectory);
+  }
+
+}
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
index 85db4f4..18e9ed1 100644
--- a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
@@ -30,13 +30,14 @@
 
 public class LocalFileSystemUploaderTest {
 
+  private static final String TOPOLOGY_PACKAGE_FILE_NAME = "some-topology.tar";
+
   private Config config;
   private String fileSystemDirectory;
   private String testTopologyDirectory;
 
   @Before
   public void before() throws Exception {
-
     // form the file system directory using bazel environ files
     fileSystemDirectory = Paths.get(System.getenv("JAVA_RUNFILES"), "topologies").toString();
 
@@ -60,17 +61,17 @@
   }
 
   @Test
-  public void testUploader() throws Exception {
-
+  public void testUploader() {
     // identify the location of the test topology tar file
-    String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
 
-    Config newconfig = Config.newBuilder()
+    Config newConfig = Config.newBuilder()
         .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
 
     // create the uploader and load the package
     LocalFileSystemUploader uploader = new LocalFileSystemUploader();
-    uploader.initialize(newconfig);
+    uploader.initialize(newConfig);
     Assert.assertNotNull(uploader.uploadPackage());
 
     // verify if the file exists
@@ -79,33 +80,32 @@
   }
 
   @Test(expected = UploaderException.class)
-  public void testSourceNotExists() throws Exception {
-
+  public void testSourceNotExists() {
     // identify the location of the test topology tar file
     String topologyPackage = Paths.get(
         testTopologyDirectory, "doesnot-exist-topology.tar").toString();
 
-    Config newconfig = Config.newBuilder()
+    Config newConfig = Config.newBuilder()
         .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
 
     // create the uploader and load the package
     LocalFileSystemUploader uploader = new LocalFileSystemUploader();
-    uploader.initialize(newconfig);
+    uploader.initialize(newConfig);
     uploader.uploadPackage();
   }
 
   @Test
-  public void testUndo() throws Exception {
-
+  public void testUndo() {
     // identify the location of the test topology tar file
-    String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
 
-    Config newconfig = Config.newBuilder()
+    Config newConfig = Config.newBuilder()
         .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
 
     // create the uploader and load the package
     LocalFileSystemUploader uploader = new LocalFileSystemUploader();
-    uploader.initialize(newconfig);
+    uploader.initialize(newConfig);
     Assert.assertNotNull(uploader.uploadPackage());
 
     // verify if the file exists
@@ -113,7 +113,95 @@
     Assert.assertTrue(new File(destFile).isFile());
 
     // now undo the file
-    uploader.undo();
+    Assert.assertTrue(uploader.undo());
     Assert.assertFalse(new File(destFile).isFile());
   }
+
+  @Test
+  public void testUseDefaultFileSystemDirectoryWhenNotSet() {
+    // identify the location of the test topology tar file
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+    // set file system directory as null
+    Config newConfig = Config.newBuilder()
+        .putAll(config)
+        .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+        .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), null)
+        .build();
+
+    // create the uploader
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.initialize(newConfig);
+
+    // get default file system directory
+    String defaultFileSystemDirectory = LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString();
+
+    String destDirectory = uploader.getTopologyDirectory();
+    String destFile = uploader.getTopologyFile();
+
+    // verify usage of default file system directory for destination directory and file
+    Assert.assertEquals(destDirectory, defaultFileSystemDirectory);
+    Assert.assertTrue(destFile.contains(defaultFileSystemDirectory));
+  }
+
+  @Test
+  public void testUploadPackageWhenTopologyFileAlreadyExists() {
+    // identify the location of the test topology tar file
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+    Config newConfig = Config.newBuilder()
+        .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
+
+    // create the uploader and load the package
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.initialize(newConfig);
+    Assert.assertNotNull(uploader.uploadPackage());
+
+    // verify if the file exists
+    String destFile = uploader.getTopologyFile();
+    Assert.assertTrue(new File(destFile).isFile());
+
+    // load same package again by overriding existing one
+    Assert.assertNotNull(uploader.uploadPackage());
+    String destFile2 = uploader.getTopologyFile();
+    Assert.assertTrue(new File(destFile2).isFile());
+
+    // verify that existing file is overridden
+    Assert.assertEquals(destFile, destFile2);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUploadPackageWhenFileSystemDirectoryIsInvalid() {
+    // identify the location of the test topology tar file
+    String topologyPackage = Paths.get(testTopologyDirectory,
+        TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+    String invalidFileSystemDirectory = "invalid%path";
+
+    // set invalid file system directory
+    Config newConfig = Config.newBuilder()
+        .putAll(config)
+        .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+        .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), invalidFileSystemDirectory).build();
+
+    // create the uploader and load the package
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.initialize(newConfig);
+    uploader.uploadPackage();
+  }
+
+  @Test
+  public void testGetUri() {
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    Assert.assertEquals(uploader.getUri("testFileName").toString(), "file://testFileName");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetUriWhenDestFileNameIsInvalid() {
+    LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+    uploader.getUri("invalid_%_DestFilePath");
+  }
+
 }
diff --git a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
index a0bd2b9..18e25de 100644
--- a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
@@ -48,6 +48,8 @@
 
     doTaskHooksTranslation(heronConfig);
 
+    doTopologyLevelTranslation(heronConfig);
+
     return heronConfig;
   }
 
@@ -113,7 +115,10 @@
   }
 
   /**
-   * Translate storm config into heron config
+   * Translate storm config into heron config. This funciton is used by both topology
+   * and component level config translations. Therefore NO config should be generated
+   * when a key does NOT exist if the key is for both topology and component.
+   * Otherwise the component config might overwrite the topolgy config with a wrong value.
    * @param heron the heron config object to receive the results.
    */
   private static void doStormTranslation(Config heronConfig) {
@@ -125,20 +130,7 @@
       Integer nWorkers = Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_WORKERS));
       com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
     }
-    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
-      Integer nAckers =
-          Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
-      if (nAckers > 0) {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
-      } else {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-      }
-    } else {
-      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-    }
+
     if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
       Integer nSecs =
           Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
@@ -162,4 +154,25 @@
       com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
     }
   }
+
+  /**
+   * Translate topology config.
+   * @param heron the heron config object to receive the results.
+   */
+  private static void doTopologyLevelTranslation(Config heronConfig) {
+    if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+      Integer nAckers =
+          Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+      if (nAckers > 0) {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+      } else {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+      }
+    } else {
+      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+    }
+  }
 }
diff --git a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
index abd132a..d7b23a2 100644
--- a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
@@ -47,6 +47,8 @@
 
     doTaskHooksTranslation(heronConfig);
 
+    doTopologyLevelTranslation(heronConfig);
+
     return heronConfig;
   }
 
@@ -113,7 +115,10 @@
   }
 
   /**
-   * Translate storm config into heron config
+   * Translate storm config into heron config. This funciton is used by both topology
+   * and component level config translations. Therefore NO config should be generated
+   * when a key does NOT exist if the key is for both topology and component.
+   * Otherwise the component config might overwrite the topolgy config with a wrong value.
    * @param heron the heron config object to receive the results.
    */
   private static void doStormTranslation(Config heronConfig) {
@@ -125,20 +130,6 @@
       Integer nWorkers = Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_WORKERS));
       com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
     }
-    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
-      Integer nAckers =
-          Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
-      if (nAckers > 0) {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
-      } else {
-        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-      }
-    } else {
-      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
-               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
-    }
     if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
       Integer nSecs =
           Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
@@ -166,4 +157,25 @@
           (Map) heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT));
     }
   }
+
+  /**
+   * Translate topology config.
+   * @param heron the heron config object to receive the results.
+   */
+  private static void doTopologyLevelTranslation(Config heronConfig) {
+    if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+      Integer nAckers =
+          Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+      if (nAckers > 0) {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+      } else {
+        com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+                 com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+      }
+    } else {
+      com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+               com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+    }
+  }
 }
diff --git a/website/content/docs/operators/deployment/uploaders/localfs.md b/website/content/docs/operators/deployment/uploaders/localfs.md
index 4e9e2a9..bd5411a 100644
--- a/website/content/docs/operators/deployment/uploaders/localfs.md
+++ b/website/content/docs/operators/deployment/uploaders/localfs.md
@@ -27,7 +27,7 @@
 * `heron.uploader.localfs.file.system.directory` --- Provides the name of the directory where
 the topology jar should be uploaded. The name of the directory should be unique per cluster
 You could use the Heron environment variables `${CLUSTER}` that will be substituted by cluster
-name.
+name. If this is not set, `${HOME}/.herondata/repository/${CLUSTER}/${ROLE}/${TOPOLOGY}` will be set as default.
 
 ### Example Local File System Uploader Configuration