Merge branch 'master' into kanishk/ui-bp-metric
diff --git a/bazel_configure.py b/bazel_configure.py
index 9693fe3..15cdf01 100755
--- a/bazel_configure.py
+++ b/bazel_configure.py
@@ -394,10 +394,11 @@
   env_map['PYTHON2'] = discover_tool('python2.7', 'Python2', 'PYTHON2', '2.7')
 
   if platform == 'Darwin':
-    env_map['AR'] = discover_tool('libtool', 'archiver', 'AR')
+    env_map['LIBTOOL'] = discover_tool('glibtool', 'Libtool', 'LIBTOOL', '2.4.2')
   else:
-    env_map['AR'] = discover_tool('ar', 'archiver', 'AR')
+    env_map['LIBTOOL'] = discover_tool('libtool', 'Libtool', 'LIBTOOL', '2.4.2')
 
+  env_map['AR'] = discover_tool('ar', 'archiver', 'AR')
   env_map['GCOV']= discover_tool('gcov','coverage tool', 'GCOV')
   env_map['DWP'] = discover_tool_default('dwp', 'dwp', 'DWP', '/usr/bin/dwp')
   env_map['NM'] = discover_tool_default('nm', 'nm', 'NM', '/usr/bin/nm')
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java
index 4511b1d..48b8f75 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java
@@ -65,7 +65,7 @@
 
   @Override
   @SuppressWarnings("unchecked")
-  public void open(Map<String, Object> conf,
+  public void open(Map conf,
                    final TopologyContext context,
                    final SpoutOutputCollector aCollector) {
     collector = aCollector;
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java
index bd00cd0..1f162a7 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java
@@ -286,7 +286,7 @@
         tups = ((StringMultiSchemeWithTopic) kafkaConfig.scheme)
             .deserializeWithTopic(topic, payloadBytes);
       } else {
-        tups = kafkaConfig.scheme.deserialize(payloadBytes);
+        tups = kafkaConfig.scheme.deserialize(payload);
       }
     }
     return tups;
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java
index 6540e39..f82a628 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import com.google.common.collect.ImmutableMap;
@@ -30,7 +31,7 @@
   @Override
   public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
     if (key == null) {
-      return deserialize(value);
+      return deserialize(ByteBuffer.wrap(value));
     }
     String keyString = StringScheme.deserializeString(key);
     String valueString = StringScheme.deserializeString(value);
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
index 46daf85..685e3c9 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -32,7 +33,7 @@
   private static final long serialVersionUID = 1985667152241541458L;
 
   @Override
-  public Iterable<List<Object>> deserialize(byte[] bytes) {
+  public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java
index 55a1f80..219d353 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -30,8 +31,10 @@
   public static final String STRING_SCHEME_KEY = "str";
   private static final long serialVersionUID = -1806461122261145598L;
 
-  public List<Object> deserialize(byte[] bytes) {
-    return new Values(deserializeString(bytes));
+  public List<Object> deserialize(ByteBuffer bytes) {
+    byte[] bytearray = new byte[bytes.remaining()];
+    bytes.get(bytearray);
+    return new Values(deserializeString(bytearray));
   }
 
   public static String deserializeString(byte[] bytes) {
diff --git a/heron/api/src/java/com/twitter/heron/api/serializer/KryoSerializer.java b/contrib/kryo-serializer/src/java/com/twitter/heron/serializer/kryo/KryoSerializer.java
similarity index 96%
rename from heron/api/src/java/com/twitter/heron/api/serializer/KryoSerializer.java
rename to contrib/kryo-serializer/src/java/com/twitter/heron/serializer/kryo/KryoSerializer.java
index 0930862..fae5d8f 100644
--- a/heron/api/src/java/com/twitter/heron/api/serializer/KryoSerializer.java
+++ b/contrib/kryo-serializer/src/java/com/twitter/heron/serializer/kryo/KryoSerializer.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.twitter.heron.api.serializer;
+package com.twitter.heron.serializer.kryo;
 
 import java.util.Map;
 
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index 9745fa8..a9e1933 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -3,12 +3,12 @@
 package(default_visibility = ["//visibility:public"])
 
 load("/tools/rules/heron_deps", "heron_java_proto_files")
+load("/tools/rules/jarjar_rules", "jarjar_binary")
 
 api_deps_files =  \
     heron_java_proto_files() + [
         ":classification",
         "//heron/common/src/java:basics-java",
-        "//third_party/java:kryo",
     ]
 
 java_library(
@@ -18,16 +18,16 @@
 )
 
 java_binary(
-    name = "api-unshaded",
+    name = "api",
     srcs = glob(["com/twitter/heron/api/**/*.java"]),
     deps = api_deps_files,
 )
 
-genrule(
+jarjar_binary(
     name = "heron-api",
-    srcs = [":api-unshaded_deploy.jar"],
-    outs = ["heron-api.jar"],
-    cmd  = "cp $< $@",
+    src = ":api_deploy.jar",
+    shade = "shade.conf",
+    deps = ["@org_sonatype_plugins_jarjar_maven_plugin//jar"]
 )
 
 java_library(
diff --git a/heron/api/src/java/com/twitter/heron/api/HeronTopology.java b/heron/api/src/java/com/twitter/heron/api/HeronTopology.java
index 0699d4f..904c1e3 100644
--- a/heron/api/src/java/com/twitter/heron/api/HeronTopology.java
+++ b/heron/api/src/java/com/twitter/heron/api/HeronTopology.java
@@ -15,11 +15,8 @@
 package com.twitter.heron.api;
 
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
-import com.google.protobuf.ByteString;
-
 import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.api.utils.Utils;
 
@@ -37,26 +34,6 @@
     this.topologyBuilder = topologyBuilder;
   }
 
-  private static TopologyAPI.Config.Builder getConfigBuilder(Config config) {
-    TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder();
-    Set<String> apiVars = config.getApiVars();
-    for (String key : config.keySet()) {
-      Object value = config.get(key);
-      TopologyAPI.Config.KeyValue.Builder b = TopologyAPI.Config.KeyValue.newBuilder();
-      b.setKey(key);
-      if (apiVars.contains(key)) {
-        b.setType(TopologyAPI.ConfigValueType.STRING_VALUE);
-        b.setValue(value.toString());
-      } else {
-        b.setType(TopologyAPI.ConfigValueType.JAVA_SERIALIZED_VALUE);
-        b.setSerializedValue(ByteString.copyFrom(Utils.serialize(value)));
-      }
-      cBldr.addKvs(b);
-    }
-
-    return cBldr;
-  }
-
   private static void addDefaultTopologyConfig(Map<String, Object> userConfig) {
     if (!userConfig.containsKey(Config.TOPOLOGY_DEBUG)) {
       userConfig.put(Config.TOPOLOGY_DEBUG, "false");
@@ -96,7 +73,7 @@
     addDefaultTopologyConfig(heronConfig);
     heronConfig.put(Config.TOPOLOGY_NAME, name);
 
-    topologyBuilder.setTopologyConfig(getConfigBuilder(heronConfig));
+    topologyBuilder.setTopologyConfig(Utils.getConfigBuilder(heronConfig));
 
     return topologyBuilder.build();
   }
diff --git a/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java b/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java
index aa1059a..6cf70e3 100644
--- a/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java
+++ b/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java
@@ -14,9 +14,7 @@
 
 package com.twitter.heron.api.topology;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.logging.Logger;
 
 import com.google.protobuf.ByteString;
 
@@ -26,24 +24,22 @@
 
 public abstract class BaseComponentDeclarer<T extends ComponentConfigurationDeclarer<?>>
     extends BaseConfigurationDeclarer<T> {
-  private static final Logger LOG = Logger.getLogger(BaseComponentDeclarer.class.getName());
   private String name;
   private IComponent component;
-  private Map<String, Object> componentConfiguration;
+  private Config componentConfiguration;
 
   public BaseComponentDeclarer(String name, IComponent comp, Number taskParallelism) {
     this.name = name;
     this.component = comp;
-    this.componentConfiguration = comp.getComponentConfiguration();
-    if (this.componentConfiguration == null) {
-      this.componentConfiguration = new HashMap<>();
+    if (comp.getComponentConfiguration() != null) {
+      this.componentConfiguration = new Config(comp.getComponentConfiguration());
+    } else {
+      this.componentConfiguration = new Config();
     }
     if (taskParallelism != null) {
-      this.componentConfiguration.put(Config.TOPOLOGY_COMPONENT_PARALLELISM,
-          taskParallelism.toString());
+      Config.setComponentParallelism(this.componentConfiguration, taskParallelism.intValue());
     } else {
-      this.componentConfiguration.put(Config.TOPOLOGY_COMPONENT_PARALLELISM,
-          "1");
+      Config.setComponentParallelism(this.componentConfiguration, 1);
     }
   }
 
@@ -63,23 +59,6 @@
     bldr.setName(name);
     bldr.setSpec(TopologyAPI.ComponentObjectSpec.JAVA_SERIALIZED_OBJECT);
     bldr.setSerializedObject(ByteString.copyFrom(Utils.serialize(component)));
-
-    TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder();
-    for (Map.Entry<String, Object> entry : componentConfiguration.entrySet()) {
-      if (entry.getKey() == null) {
-        LOG.warning("ignore: config key is null");
-        continue;
-      }
-      if (entry.getValue() == null) {
-        LOG.warning("ignore: config key " + entry.getKey() + " has null value");
-        continue;
-      }
-      TopologyAPI.Config.KeyValue.Builder kvBldr = TopologyAPI.Config.KeyValue.newBuilder();
-      kvBldr.setKey(entry.getKey());
-      kvBldr.setValue(entry.getValue().toString());
-      kvBldr.setType(TopologyAPI.ConfigValueType.STRING_VALUE);
-      cBldr.addKvs(kvBldr);
-    }
-    bldr.setConfig(cBldr);
+    bldr.setConfig(Utils.getConfigBuilder(componentConfiguration));
   }
 }
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java b/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java
index a66bab2..6cda2c8 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java
@@ -14,10 +14,6 @@
 
 package com.twitter.heron.api.utils;
 
-// TODO:- Uncomment this
-// import org.slf4j.Logger;
-// import org.slf4j.LoggerFactory;
-
 /**
  * This is a class that helps to auto tune the max spout pending value
  */
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/Utils.java b/heron/api/src/java/com/twitter/heron/api/utils/Utils.java
index e07d3a9..97579d2 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/Utils.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/Utils.java
@@ -25,8 +25,17 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import com.google.protobuf.ByteString;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.generated.TopologyAPI;
 
 public final class Utils {
+  private static final Logger LOG = Logger.getLogger(Utils.class.getName());
+
   public static final String DEFAULT_STREAM_ID = "default";
 
   private Utils() {
@@ -102,4 +111,39 @@
 
     return ret;
   }
+
+  /**
+   * Converts a Heron Config object into a TopologyAPI.Config.Builder. Config entries with null
+   * keys or values are ignored.
+   *
+   * @param config heron Config object
+   * @return TopologyAPI.Config.Builder with values loaded from config
+   */
+  public static TopologyAPI.Config.Builder getConfigBuilder(Config config) {
+    TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder();
+    Set<String> apiVars = config.getApiVars();
+    for (String key : config.keySet()) {
+      if (key == null) {
+        LOG.warning("ignore: null config key found");
+        continue;
+      }
+      Object value = config.get(key);
+      if (value == null) {
+        LOG.warning("ignore: config key " + key + " has null value");
+        continue;
+      }
+      TopologyAPI.Config.KeyValue.Builder b = TopologyAPI.Config.KeyValue.newBuilder();
+      b.setKey(key);
+      if (apiVars.contains(key)) {
+        b.setType(TopologyAPI.ConfigValueType.STRING_VALUE);
+        b.setValue(value.toString());
+      } else {
+        b.setType(TopologyAPI.ConfigValueType.JAVA_SERIALIZED_VALUE);
+        b.setSerializedValue(ByteString.copyFrom(serialize(value)));
+      }
+      cBldr.addKvs(b);
+    }
+
+    return cBldr;
+  }
 }
diff --git a/heron/api/src/java/shade.conf b/heron/api/src/java/shade.conf
new file mode 100644
index 0000000..7ee86d0
--- /dev/null
+++ b/heron/api/src/java/shade.conf
@@ -0,0 +1 @@
+rule com.google** com.twitter.heron.__shaded__.@0
diff --git a/heron/common/src/java/BUILD b/heron/common/src/java/BUILD
index 678421e..03754ec 100644
--- a/heron/common/src/java/BUILD
+++ b/heron/common/src/java/BUILD
@@ -7,6 +7,7 @@
 common_deps_files = \
     heron_java_proto_files() + [
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:classification",
         "@org_yaml_snakeyaml//jar",
         "@com_google_guava_guava//jar",
     ]
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java
index 246efbf..4ebb6a4 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java
@@ -32,7 +32,7 @@
  * 4. Expose methods which could be called externally to change the value of metrics
  */
 
-public class BoltMetrics {
+public class BoltMetrics implements ComponentMetrics {
   private final CountMetric ackCount;
   private final ReducedMetric<MeanReducerState, Number, Double> processLatency;
   private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java
new file mode 100644
index 0000000..5503d14
--- /dev/null
+++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java
@@ -0,0 +1,28 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.common.utils.metrics;
+
+import com.twitter.heron.classification.InterfaceAudience;
+import com.twitter.heron.classification.InterfaceStability;
+
+/**
+ * Interface for common metric actions that both spouts and bolts support
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ComponentMetrics {
+
+  void serializeDataTuple(String streamId, long latency);
+  void emittedTuple(String streamId);
+}
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java
index 998ea75..8ad183f 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java
@@ -33,7 +33,7 @@
  * 4. Expose methods which could be called externally to change the value of metrics
  */
 
-public class SpoutMetrics {
+public class SpoutMetrics implements ComponentMetrics {
   private final CountMetric ackCount;
   private final ReducedMetric<MeanReducerState, Number, Double> completeLatency;
   private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java b/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java
index 9427d34..1a72dbf 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java
@@ -193,29 +193,26 @@
 
   public void setTopologyContext(MetricsCollector metricsCollector) {
     topologyContext =
-        new TopologyContextImpl(constructConfig(pplan.getTopology().getTopologyConfig(), component),
+        new TopologyContextImpl(mergeConfigs(pplan.getTopology().getTopologyConfig(), component),
             pplan.getTopology(), makeTaskToComponentMap(), myTaskId, metricsCollector);
   }
 
-  private Map<String, Object> constructConfig(TopologyAPI.Config config,
-                                              TopologyAPI.Component acomponent) {
-    Map<String, Object> retval = new HashMap<String, Object>();
+  private Map<String, Object> mergeConfigs(TopologyAPI.Config config,
+                                           TopologyAPI.Component acomponent) {
+    Map<String, Object> map = new HashMap<>();
+    addConfigsToMap(config, map);
+    addConfigsToMap(acomponent.getConfig(), map); // Override any component specific configs
+    return map;
+  }
+
+  private void addConfigsToMap(TopologyAPI.Config config, Map<String, Object> map) {
     for (TopologyAPI.Config.KeyValue kv : config.getKvsList()) {
       if (kv.hasValue()) {
-        retval.put(kv.getKey(), kv.getValue());
+        map.put(kv.getKey(), kv.getValue());
       } else {
-        retval.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
+        map.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
       }
     }
-    // Override any component specific configs
-    for (TopologyAPI.Config.KeyValue kv : acomponent.getConfig().getKvsList()) {
-      if (kv.hasValue()) {
-        retval.put(kv.getKey(), kv.getValue());
-      } else {
-        retval.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
-      }
-    }
-    return retval;
   }
 
   private Map<Integer, String> makeTaskToComponentMap() {
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java b/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java
index 40179de..5b356c4 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java
@@ -15,15 +15,18 @@
 package com.twitter.heron.common.utils.misc;
 
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import com.twitter.heron.api.Config;
 import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.serializer.KryoSerializer;
+import com.twitter.heron.api.serializer.JavaSerializer;
 
 /**
  * Get the serializer according to the serializerClassName
  */
 public final class SerializeDeSerializeHelper {
+  private static final Logger LOG = Logger.getLogger(SerializeDeSerializeHelper.class.getName());
 
   private SerializeDeSerializeHelper() {
   }
@@ -33,7 +36,12 @@
     try {
       String serializerClassName = (String) config.get(Config.TOPOLOGY_SERIALIZER_CLASSNAME);
       if (serializerClassName == null) {
-        serializer = new KryoSerializer();
+        LOG.log(Level.WARNING, "Serializer class name not provided. "
+            + "Fall back to Java serializer. "
+            + "This could cause serious performance degradation. "
+            + "You can specify to use Kryo as serializer. "
+            + "See https://twitter.github.io/heron/docs/developers/serialization/ for details");
+        serializer = new JavaSerializer();
       } else {
         serializer = (IPluggableSerializer) Class.forName(serializerClassName).newInstance();
       }
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 3415b9f..b8e4ce3 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -209,6 +209,7 @@
     self.processes_to_monitor = {}
 
     self.state_managers = []
+    self.jvm_version = None
 
   @staticmethod
   def parse_args(args):
@@ -412,21 +413,29 @@
     retval = {}
     # TO DO (Karthik) to be moved into keys and defaults files
     code_cache_size_mb = 64
-    perm_gen_size_mb = 128
+    java_metasize_mb = 128
+
+    java_version = self._get_jvm_version()
+    java_metasize_param = 'MetaspaceSize'
+    if java_version.startswith("1.7") or \
+            java_version.startswith("1.6") or \
+            java_version.startswith("1.5"):
+      java_metasize_param = 'PermSize'
+
     for (instance_id, component_name, global_task_id, component_index) in instance_info:
       total_jvm_size = int(self.component_rammap[component_name] / (1024 * 1024))
-      heap_size_mb = total_jvm_size - code_cache_size_mb - perm_gen_size_mb
+      heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
       Log.info("component name: %s, ram request: %d, total jvm size: %dM, "
-               "cache size: %dM, perm size: %dM"
+               "cache size: %dM, metaspace size: %dM"
                % (component_name, self.component_rammap[component_name],
-                  total_jvm_size, code_cache_size_mb, perm_gen_size_mb))
+                  total_jvm_size, code_cache_size_mb, java_metasize_mb))
       xmn_size = int(heap_size_mb / 2)
       instance_cmd = [os.path.join(self.heron_java_home, 'bin/java'),
                       '-Xmx%dM' % heap_size_mb,
                       '-Xms%dM' % heap_size_mb,
                       '-Xmn%dM' % xmn_size,
-                      '-XX:MaxPermSize=%dM' % perm_gen_size_mb,
-                      '-XX:PermSize=%dM' % perm_gen_size_mb,
+                      '-XX:Max%s=%dM' % (java_metasize_param, java_metasize_mb),
+                      '-XX:%s=%dM' % (java_metasize_param, java_metasize_mb),
                       '-XX:ReservedCodeCacheSize=%dM' % code_cache_size_mb,
                       '-XX:+CMSScavengeBeforeRemark',
                       '-XX:TargetSurvivorRatio=90',
@@ -466,6 +475,21 @@
       retval[instance_id] = instance_cmd
     return retval
 
+  def _get_jvm_version(self):
+    if not self.jvm_version:
+      cmd = [os.path.join(self.heron_java_home, 'bin/java'),
+             '-cp', self.instance_classpath, 'com.twitter.heron.instance.util.JvmVersion']
+      process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+      (process_stdout, process_stderr) = process.communicate()
+      if process.returncode != 0:
+        Log.error("Failed to determine JVM version. Exiting. Output of %s: %s",
+                  ' '.join(cmd), process_stderr)
+        sys.exit(1)
+
+      self.jvm_version = process_stdout
+      Log.info("Detected JVM version %s" % self.jvm_version)
+    return self.jvm_version
+
   # Returns the processes for each Python Heron Instance
   def _get_python_instance_cmd(self, instance_info):
     # pylint: disable=fixme
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index 64334d8..f76fe1f 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -62,6 +62,9 @@
     self.processes.append(ProcessInfo(popen, name, cmd))
     return popen
 
+  def _get_jvm_version(self):
+    return "1.8.y.x"
+
 class HeronExecutorTest(unittest.TestCase):
   """Unittest for Heron Executor"""
 
@@ -107,8 +110,8 @@
 
   def get_expected_instance_command(component_name, instance_id, container_id):
     instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
-    return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxPermSize=128M " \
-           "-XX:PermSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
+    return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxMetaspaceSize=128M " \
+           "-XX:MetaspaceSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
            "-XX:TargetSurvivorRatio=90 -XX:+PrintCommandLineFlags -verbosegc -XX:+PrintGCDetails " \
            "-XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
            "-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
diff --git a/heron/instance/src/java/BUILD b/heron/instance/src/java/BUILD
index 06f6681..f635b13 100644
--- a/heron/instance/src/java/BUILD
+++ b/heron/instance/src/java/BUILD
@@ -10,7 +10,6 @@
         "//heron/api/src/java:api-java",
         "//heron/api/src/java:classification",
         "//heron/common/src/java:common-java",
-        "//third_party/java:logging", 
     ]
 
 java_library(
@@ -29,10 +28,6 @@
     deps = [
         ":instance-java",
         "//heron/common/src/java:common-java",
-        # Heron does not need it, we supply this,
-        # since we exclude them explicitly in target "heron_binary".
-        # Later we might remove it.
-        "//third_party/java:logging",
     ] + heron_java_proto_files(),
 )
 
diff --git a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
new file mode 100644
index 0000000..890ee9f
--- /dev/null
+++ b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
@@ -0,0 +1,143 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.instance;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.serializer.IPluggableSerializer;
+import com.twitter.heron.common.basics.Communicator;
+import com.twitter.heron.common.utils.metrics.ComponentMetrics;
+import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
+import com.twitter.heron.proto.system.HeronTuples;
+
+/**
+ * Common functionality used by both bolt and spout output collectors
+ */
+public class AbstractOutputCollector {
+  protected final IPluggableSerializer serializer;
+  protected final OutgoingTupleCollection outputter;
+  protected final ComponentMetrics metrics;
+  protected final boolean ackEnabled;
+  private long totalTuplesEmitted;
+  private PhysicalPlanHelper helper;
+
+  public AbstractOutputCollector(IPluggableSerializer serializer,
+                                 PhysicalPlanHelper helper,
+                                 Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
+                                 ComponentMetrics metrics) {
+    this.serializer = serializer;
+    this.metrics = metrics;
+    this.totalTuplesEmitted = 0;
+    updatePhysicalPlanHelper(helper);
+
+    Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
+    if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
+        && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
+      this.ackEnabled = Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
+    } else {
+      this.ackEnabled = false;
+    }
+
+    this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
+  }
+
+  public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
+    this.helper = physicalPlanHelper;
+  }
+
+  public PhysicalPlanHelper getPhysicalPlanHelper() {
+    return helper;
+  }
+  /////////////////////////////////////////////////////////
+  // Following public methods are used for querying or
+  // interacting internal state of the output collectors
+  /////////////////////////////////////////////////////////
+
+  // Return true we could offer item to outQueue
+  public boolean isOutQueuesAvailable() {
+    return outputter.isOutQueuesAvailable();
+  }
+
+  // Return the total data emitted in bytes
+  public long getTotalDataEmittedInBytes() {
+    return outputter.getTotalDataEmittedInBytes();
+  }
+
+  // Flush the tuples to next stage
+  public void sendOutTuples() {
+    outputter.sendOutTuples();
+  }
+
+  // Clean the internal state of BoltOutputCollectorImpl
+  public void clear() {
+    outputter.clear();
+  }
+
+  public long getTotalTuplesEmitted() {
+    return totalTuplesEmitted;
+  }
+
+  protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId,
+                                                                List<Object> tuple) {
+    // Start construct the data tuple
+    HeronTuples.HeronDataTuple.Builder builder = HeronTuples.HeronDataTuple.newBuilder();
+
+    // set the key. This is mostly ignored
+    builder.setKey(0);
+
+    List<Integer> customGroupingTargetTaskIds = null;
+    if (!helper.isCustomGroupingEmpty()) {
+      // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
+      customGroupingTargetTaskIds =
+          helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
+
+      if (customGroupingTargetTaskIds != null) {
+        // It is a CustomStreamGrouping
+        builder.addAllDestTaskIds(customGroupingTargetTaskIds);
+      }
+    }
+
+    // Invoke user-defined emit task hook
+    helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
+
+    return builder;
+  }
+
+  protected void sendTuple(HeronTuples.HeronDataTuple.Builder bldr,
+                           String streamId, List<Object> tuple) {
+    long tupleSizeInBytes = 0;
+    long startTime = System.nanoTime();
+
+    // Serialize it
+    for (Object obj : tuple) {
+      byte[] b = serializer.serialize(obj);
+      ByteString bstr = ByteString.copyFrom(b);
+      bldr.addValues(bstr);
+      tupleSizeInBytes += b.length;
+    }
+
+    long latency = System.nanoTime() - startTime;
+    metrics.serializeDataTuple(streamId, latency);
+    // submit to outputter
+    outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
+    totalTuplesEmitted++;
+
+    // Update metrics
+    metrics.emittedTuple(streamId);
+  }
+}
diff --git a/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java b/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java
index bfc896c..e91f427 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java
@@ -32,7 +32,6 @@
   protected final String componentName;
   // We have just one outQueue responsible for both control tuples and data tuples
   private final Communicator<HeronTuples.HeronTupleSet> outQueue;
-  private final SystemConfig systemConfig;
 
   private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
   private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
@@ -53,7 +52,7 @@
       Communicator<HeronTuples.HeronTupleSet> outQueue) {
     this.outQueue = outQueue;
     this.componentName = componentName;
-    this.systemConfig =
+    SystemConfig systemConfig =
         (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
 
     // Initialize the values in constructor
diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
index 229b59c..1840c03 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
@@ -42,7 +42,7 @@
 
 public class BoltInstance implements IInstance {
 
-  protected final PhysicalPlanHelper helper;
+  protected PhysicalPlanHelper helper;
   protected final IBolt bolt;
   protected final BoltOutputCollectorImpl collector;
   protected final IPluggableSerializer serializer;
@@ -99,6 +99,11 @@
       ((IUpdatable) bolt).update(physicalPlanHelper.getTopologyContext());
     }
     collector.updatePhysicalPlanHelper(physicalPlanHelper);
+
+    // Re-prepare the CustomStreamGrouping since the downstream tasks can change
+    physicalPlanHelper.prepareForCustomStreamGrouping();
+    // Reset the helper
+    helper = physicalPlanHelper;
   }
 
   @Override
diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java
index 10a61e9..ffa58fb 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java
@@ -17,14 +17,10 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
 import com.twitter.heron.api.bolt.IOutputCollector;
 import com.twitter.heron.api.serializer.IPluggableSerializer;
 import com.twitter.heron.api.tuple.Tuple;
@@ -32,7 +28,7 @@
 import com.twitter.heron.common.utils.metrics.BoltMetrics;
 import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
 import com.twitter.heron.common.utils.tuple.TupleImpl;
-import com.twitter.heron.instance.OutgoingTupleCollection;
+import com.twitter.heron.instance.AbstractOutputCollector;
 import com.twitter.heron.proto.system.HeronTuples;
 
 /**
@@ -53,61 +49,32 @@
  * 2. Pack the tuple and submit the OutgoingTupleCollection's addDataTuple
  * 3. Update the metrics
  */
-public class BoltOutputCollectorImpl implements IOutputCollector {
+public class BoltOutputCollectorImpl extends AbstractOutputCollector implements IOutputCollector {
   private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
 
-  private final IPluggableSerializer serializer;
-  private final OutgoingTupleCollection outputter;
-
   // Reference to update the bolt metrics
   private final BoltMetrics boltMetrics;
-  private PhysicalPlanHelper helper;
 
-  private final boolean ackEnabled;
-
-  public BoltOutputCollectorImpl(IPluggableSerializer serializer,
-                                 PhysicalPlanHelper helper,
-                                 Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
-                                 BoltMetrics boltMetrics) {
+  protected BoltOutputCollectorImpl(IPluggableSerializer serializer,
+                                    PhysicalPlanHelper helper,
+                                    Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
+                                    BoltMetrics boltMetrics) {
+    super(serializer, helper, streamOutQueue, boltMetrics);
+    this.boltMetrics = boltMetrics;
 
     if (helper.getMyBolt() == null) {
       throw new RuntimeException(helper.getMyTaskId() + " is not a bolt ");
     }
-
-    this.serializer = serializer;
-    this.boltMetrics = boltMetrics;
-    updatePhysicalPlanHelper(helper);
-
-    Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
-    if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
-        && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
-      this.ackEnabled = Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
-    } else {
-      this.ackEnabled = false;
-    }
-
-    this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
   }
 
-  void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
-    this.helper = physicalPlanHelper;
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following public methods are overrides OutputCollector
-  /////////////////////////////////////////////////////////
-
   @Override
   public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
     return admitBoltTuple(streamId, anchors, tuple);
   }
 
   @Override
-  public void emitDirect(
-      int taskId,
-      String streamId,
-      Collection<Tuple> anchors,
-      List<Object> tuple) {
+  public void emitDirect(int taskId, String streamId,
+                         Collection<Tuple> anchors, List<Object> tuple) {
     throw new RuntimeException("emitDirect not supported");
   }
 
@@ -127,67 +94,23 @@
   }
 
   /////////////////////////////////////////////////////////
-  // Following public methods are used for querying or
-  // interacting internal state of the BoltOutputCollectorImpl
-  /////////////////////////////////////////////////////////
-
-  // Return true we could offer item to outQueue
-  public boolean isOutQueuesAvailable() {
-    return outputter.isOutQueuesAvailable();
-  }
-
-  // Return the total data emitted in bytes
-  public long getTotalDataEmittedInBytes() {
-    return outputter.getTotalDataEmittedInBytes();
-  }
-
-  // Flush the tuples to next stage
-  public void sendOutTuples() {
-    outputter.sendOutTuples();
-  }
-
-  // Clean the internal state of BoltOutputCollectorImpl
-  public void clear() {
-    outputter.clear();
-  }
-
-  /////////////////////////////////////////////////////////
   // Following private methods are internal implementations
   /////////////////////////////////////////////////////////
-  private List<Integer> admitBoltTuple(
-      String streamId,
-      Collection<Tuple> anchors,
-      List<Object> tuple) {
-    if (helper.isTerminatedComponent()) {
+  private List<Integer> admitBoltTuple(String streamId,
+                                       Collection<Tuple> anchors,
+                                       List<Object> tuple) {
+    if (getPhysicalPlanHelper().isTerminatedComponent()) {
       // No need to handle this tuples
       return null;
     }
 
     // Start construct the data tuple
-    HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
-    // set the key. This is mostly ignored
-    bldr.setKey(0);
-
-    List<Integer> customGroupingTargetTaskIds = null;
-    if (!helper.isCustomGroupingEmpty()) {
-      // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
-      customGroupingTargetTaskIds =
-          helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
-      if (customGroupingTargetTaskIds != null) {
-        // It is a CustomStreamGrouping
-        bldr.addAllDestTaskIds(customGroupingTargetTaskIds);
-      }
-    }
-
-    // Invoke user-defined emit task hook
-    helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
+    HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple);
 
     // Set the anchors for a tuple
     if (anchors != null) {
       // This message is rooted
-      Set<HeronTuples.RootId> mergedRoots = new HashSet<HeronTuples.RootId>();
+      Set<HeronTuples.RootId> mergedRoots = new HashSet<>();
       for (Tuple tpl : anchors) {
         if (tpl instanceof TupleImpl) {
           TupleImpl t = (TupleImpl) tpl;
@@ -199,24 +122,7 @@
       }
     }
 
-    long tupleSizeInBytes = 0;
-    long startTime = System.nanoTime();
-
-    // Serialize it
-    for (Object obj : tuple) {
-      byte[] b = serializer.serialize(obj);
-      ByteString bstr = ByteString.copyFrom(b);
-      bldr.addValues(bstr);
-      tupleSizeInBytes += b.length;
-    }
-
-    long latency = System.nanoTime() - startTime;
-    boltMetrics.serializeDataTuple(streamId, latency);
-    // submit to outputter
-    outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
-
-    // Update metrics
-    boltMetrics.emittedTuple(streamId);
+    sendTuple(bldr, streamId, tuple);
 
     // TODO:- remove this after changing the api
     return null;
@@ -244,7 +150,7 @@
     }
 
     // Invoke user-defined boltAck task hook
-    helper.getTopologyContext().invokeHookBoltAck(tuple, latency);
+    getPhysicalPlanHelper().getTopologyContext().invokeHookBoltAck(tuple, latency);
 
     boltMetrics.ackedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
   }
@@ -271,7 +177,7 @@
     }
 
     // Invoke user-defined boltFail task hook
-    helper.getTopologyContext().invokeHookBoltFail(tuple, latency);
+    getPhysicalPlanHelper().getTopologyContext().invokeHookBoltFail(tuple, latency);
 
     boltMetrics.failedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
   }
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 50c1166..b3e1660 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -117,6 +117,11 @@
       ((IUpdatable) spout).update(physicalPlanHelper.getTopologyContext());
     }
     collector.updatePhysicalPlanHelper(physicalPlanHelper);
+
+    // Re-prepare the CustomStreamGrouping since the downstream tasks can change
+    physicalPlanHelper.prepareForCustomStreamGrouping();
+    // Reset the helper
+    helper = physicalPlanHelper;
   }
 
   @Override
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java
index fe04498..39a9754 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java
@@ -19,21 +19,17 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
 import com.twitter.heron.api.serializer.IPluggableSerializer;
 import com.twitter.heron.api.spout.ISpoutOutputCollector;
 import com.twitter.heron.common.basics.Communicator;
-import com.twitter.heron.common.utils.metrics.SpoutMetrics;
+import com.twitter.heron.common.utils.metrics.ComponentMetrics;
 import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
 import com.twitter.heron.common.utils.misc.TupleKeyGenerator;
-import com.twitter.heron.instance.OutgoingTupleCollection;
+import com.twitter.heron.instance.AbstractOutputCollector;
 import com.twitter.heron.proto.system.HeronTuples;
 
 /**
@@ -49,7 +45,8 @@
  * 3. Maintain some statistics, for instance, total tuples emitted.
  * <p>
  */
-public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+public class SpoutOutputCollectorImpl
+    extends AbstractOutputCollector implements ISpoutOutputCollector {
   private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
 
   // Map from tuple key to composite object with insertion-order, i.e. ordered by time
@@ -57,61 +54,31 @@
 
   private final TupleKeyGenerator keyGenerator;
 
-  private final SpoutMetrics spoutMetrics;
-  private PhysicalPlanHelper helper;
-
-  private final boolean ackingEnabled;
   // When acking is not enabled, if the spout does an emit with a anchor
   // we need to ack it immediately. This keeps the list of those
   private final Queue<RootTupleInfo> immediateAcks;
 
-  private final IPluggableSerializer serializer;
-  private final OutgoingTupleCollection outputter;
-
-  private long totalTuplesEmitted;
-
-  public SpoutOutputCollectorImpl(IPluggableSerializer serializer,
-                                  PhysicalPlanHelper helper,
-                                  Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
-                                  SpoutMetrics spoutMetrics) {
+  protected SpoutOutputCollectorImpl(IPluggableSerializer serializer,
+                                     PhysicalPlanHelper helper,
+                                     Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
+                                     ComponentMetrics spoutMetrics) {
+    super(serializer, helper, streamOutQueue, spoutMetrics);
     if (helper.getMySpout() == null) {
       throw new RuntimeException(helper.getMyTaskId() + " is not a spout ");
     }
 
-    this.serializer = serializer;
-    this.spoutMetrics = spoutMetrics;
     this.keyGenerator = new TupleKeyGenerator();
-    updatePhysicalPlanHelper(helper);
 
     // with default capacity, load factor and insertion order
-    inFlightTuples = new LinkedHashMap<Long, RootTupleInfo>();
+    inFlightTuples = new LinkedHashMap<>();
 
-    Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
-    if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
-        && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
-      this.ackingEnabled =
-          Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
-    } else {
-      this.ackingEnabled = false;
-    }
-
-    if (!ackingEnabled) {
-      immediateAcks = new ArrayDeque<RootTupleInfo>();
+    if (!ackEnabled) {
+      immediateAcks = new ArrayDeque<>();
     } else {
       immediateAcks = null;
     }
-
-    this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
   }
 
-  void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
-    this.helper = physicalPlanHelper;
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following public methods are overrides OutputCollector
-  /////////////////////////////////////////////////////////
-
   @Override
   public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
     return admitSpoutTuple(streamId, tuple, messageId);
@@ -133,40 +100,20 @@
   // Following public methods are used for querying or
   // interacting internal state of the BoltOutputCollectorImpl
   /////////////////////////////////////////////////////////
-
-  // Return true we could offer item to outQueue
-  public boolean isOutQueuesAvailable() {
-    return outputter.isOutQueuesAvailable();
-  }
-
-  // Return the total data emitted in bytes
-  public long getTotalDataEmittedInBytes() {
-    return outputter.getTotalDataEmittedInBytes();
-  }
-
-  // Flush the tuples to next stage
-  public void sendOutTuples() {
-    outputter.sendOutTuples();
-  }
-
-  public long getTotalTuplesEmitted() {
-    return totalTuplesEmitted;
-  }
-
   public int numInFlight() {
     return inFlightTuples.size();
   }
 
-  public Queue<RootTupleInfo> getImmediateAcks() {
+  Queue<RootTupleInfo> getImmediateAcks() {
     return immediateAcks;
   }
 
-  public RootTupleInfo retireInFlight(long rootId) {
+  RootTupleInfo retireInFlight(long rootId) {
     return inFlightTuples.remove(rootId);
   }
 
-  public List<RootTupleInfo> retireExpired(long timeout) {
-    List<RootTupleInfo> retval = new ArrayList<RootTupleInfo>();
+  List<RootTupleInfo> retireExpired(long timeout) {
+    List<RootTupleInfo> retval = new ArrayList<>();
     long curTime = System.nanoTime();
 
     // The LinkedHashMap is ordered by insertion order, i.e. ordered by time
@@ -186,81 +133,41 @@
     return retval;
   }
 
-  // Clean the internal state of BoltOutputCollectorImpl
-  public void clear() {
-    outputter.clear();
-  }
-
   /////////////////////////////////////////////////////////
   // Following private methods are internal implementations
   /////////////////////////////////////////////////////////
 
   private List<Integer> admitSpoutTuple(String streamId, List<Object> tuple, Object messageId) {
     // No need to send tuples if it is already terminated
-    if (helper.isTerminatedComponent()) {
+    if (getPhysicalPlanHelper().isTerminatedComponent()) {
       return null;
     }
 
     // Start construct the data tuple
-    HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
-    // set the key. This is mostly ignored
-    bldr.setKey(0);
-
-    // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
-    List<Integer> customGroupingTargetTaskIds = null;
-    if (!helper.isCustomGroupingEmpty()) {
-      customGroupingTargetTaskIds =
-          helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
-      if (customGroupingTargetTaskIds != null) {
-        // It is a CustomStreamGrouping
-        bldr.addAllDestTaskIds(customGroupingTargetTaskIds);
-      }
-    }
-
-    // Invoke user-defined emit task hook
-    helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
+    HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple);
 
     if (messageId != null) {
       RootTupleInfo tupleInfo = new RootTupleInfo(streamId, messageId);
-      if (ackingEnabled) {
+      if (ackEnabled) {
         // This message is rooted
-        HeronTuples.RootId.Builder rtbldr = EstablishRootId(tupleInfo);
+        HeronTuples.RootId.Builder rtbldr = establishRootId(tupleInfo);
         bldr.addRoots(rtbldr);
       } else {
         immediateAcks.offer(tupleInfo);
       }
     }
 
-    long tupleSizeInBytes = 0;
-    long startTime = System.nanoTime();
-
-    // Serialize it
-    for (Object obj : tuple) {
-      byte[] b = serializer.serialize(obj);
-      ByteString bstr = ByteString.copyFrom(b);
-      bldr.addValues(bstr);
-      tupleSizeInBytes += b.length;
-    }
-
-    long latency = System.nanoTime() - startTime;
-    spoutMetrics.serializeDataTuple(streamId, latency);
-
-    // submit to outputter
-    outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
-    totalTuplesEmitted++;
-    spoutMetrics.emittedTuple(streamId);
+    sendTuple(bldr, streamId, tuple);
 
     // TODO:- remove this after changing the api
     return null;
   }
 
-  private HeronTuples.RootId.Builder EstablishRootId(RootTupleInfo tupleInfo) {
+  private HeronTuples.RootId.Builder establishRootId(RootTupleInfo tupleInfo) {
     // This message is rooted
     long rootId = keyGenerator.next();
     HeronTuples.RootId.Builder rtbldr = HeronTuples.RootId.newBuilder();
-    rtbldr.setTaskid(helper.getMyTaskId());
+    rtbldr.setTaskid(getPhysicalPlanHelper().getMyTaskId());
     rtbldr.setKey(rootId);
     inFlightTuples.put(rootId, tupleInfo);
     return rtbldr;
diff --git a/heron/instance/src/java/com/twitter/heron/instance/util/JvmVersion.java b/heron/instance/src/java/com/twitter/heron/instance/util/JvmVersion.java
new file mode 100644
index 0000000..249899d
--- /dev/null
+++ b/heron/instance/src/java/com/twitter/heron/instance/util/JvmVersion.java
@@ -0,0 +1,26 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.instance.util;
+
+/**
+ * When run, prints to stdout the version of the java vm this class is being run in. This is used
+ * by the executor to get just the java version of the VM for easy and consistent parsing.
+ */
+public final class JvmVersion {
+  private JvmVersion() { }
+
+  public static void main(String[] args) {
+    System.out.print(System.getProperty("java.version"));
+  }
+}
diff --git a/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java b/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java
index 3072be1..10f2e02 100644
--- a/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java
+++ b/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java
@@ -67,6 +67,8 @@
 
   private PhysicalPlanHelper helper;
 
+  private long lastNotConnectedLogTime = 0;
+
   public StreamManagerClient(NIOLooper s, String streamManagerHost, int streamManagerPort,
                              String topologyName, String topologyId,
                              PhysicalPlans.Instance instance,
@@ -240,6 +242,8 @@
   }
 
   private void readStreamMessageIfNeeded() {
+    final long lastNotConnectedLogThrottleSeconds = 5;
+
     // If client is not connected, just return
     if (isConnected()) {
       if (isInQueuesAvailable() || helper == null) {
@@ -249,7 +253,13 @@
         stopReading();
       }
     } else {
-      LOG.info("Stop reading due to not yet connected to Stream Manager.");
+      long now = System.currentTimeMillis();
+      if (now - lastNotConnectedLogTime > lastNotConnectedLogThrottleSeconds * 1000) {
+        LOG.info(String.format("Stop reading due to not yet connected to Stream Manager. This "
+            + "message is throttled to emit no more than once every %d seconds.",
+            lastNotConnectedLogThrottleSeconds));
+        lastNotConnectedLogTime = now;
+      }
     }
   }
 
diff --git a/heron/instance/src/python/network/metricsmgr_client.py b/heron/instance/src/python/network/metricsmgr_client.py
index ddf2861..9f092b6 100644
--- a/heron/instance/src/python/network/metricsmgr_client.py
+++ b/heron/instance/src/python/network/metricsmgr_client.py
@@ -72,6 +72,7 @@
       Log.error("Error connecting to Metrics Manager with status: %s" % str(status))
       retry_interval = float(self.sys_config[constants.INSTANCE_RECONNECT_METRICSMGR_INTERVAL_SEC])
       self.looper.register_timer_task_in_sec(self.start_connect, retry_interval)
+      return
     self._send_register_req()
 
   def on_response(self, status, context, response):
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index f61fcaf..c7432ae 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -7,6 +7,7 @@
         "//heron/common/src/java:common-java",
         "//heron/instance/src/java:instance-java",
         "//third_party/java:junit4",
+        "//third_party/java:kryo",
     ]
 
 java_library(
diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/AbstractTupleRoutingTest.java b/heron/instance/tests/java/com/twitter/heron/grouping/AbstractTupleRoutingTest.java
new file mode 100644
index 0000000..ddf7bf9
--- /dev/null
+++ b/heron/instance/tests/java/com/twitter/heron/grouping/AbstractTupleRoutingTest.java
@@ -0,0 +1,277 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.grouping;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.common.basics.Communicator;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.common.basics.SlaveLooper;
+import com.twitter.heron.common.basics.SysUtils;
+import com.twitter.heron.common.basics.WakeableLooper;
+import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
+import com.twitter.heron.instance.InstanceControlMsg;
+import com.twitter.heron.instance.Slave;
+import com.twitter.heron.proto.system.HeronTuples;
+import com.twitter.heron.proto.system.Metrics;
+import com.twitter.heron.proto.system.PhysicalPlans;
+import com.twitter.heron.resource.Constants;
+import com.twitter.heron.resource.TestBolt;
+import com.twitter.heron.resource.TestSpout;
+import com.twitter.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Abstract test to verify that tuples can be routed according to custom logic. Specific tests
+ * should extend this and override the initSpout(..), initBoltA(..) and/or initBoltB(..) methods as
+ * necessary to achieve the desired routing logic.
+ */
+public abstract class AbstractTupleRoutingTest {
+  private WakeableLooper testLooper;
+  private SlaveLooper slaveLooper;
+  private PhysicalPlans.PhysicalPlan physicalPlan;
+  private Communicator<HeronTuples.HeronTupleSet> outStreamQueue;
+  private Communicator<HeronTuples.HeronTupleSet> inStreamQueue;
+  private Communicator<InstanceControlMsg> inControlQueue;
+  private ExecutorService threadsPool;
+  private volatile int tupleReceived;
+  private volatile StringBuilder groupingInitInfo;
+  private Slave slave;
+
+  // Test component info. Topology is SPOUT -> BOLT_A -> BOLT_B
+  protected enum Component {
+    SPOUT("test-spout", "spout-id"),
+    BOLT_A("test-bolt-a", "bolt-a-id"),
+    BOLT_B("test-bolt-b", "bolt-b-id");
+
+    private String name;
+    private String id;
+
+    Component(String name, String instanceId) {
+      this.name = name;
+      this.id = instanceId;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getInstanceId() {
+      return id;
+    }
+  }
+
+  @Before
+  public void before() throws Exception {
+    UnitTestHelper.addSystemConfigToSingleton();
+
+    tupleReceived = 0;
+    groupingInitInfo = new StringBuilder();
+
+    testLooper = new SlaveLooper();
+    slaveLooper = new SlaveLooper();
+    outStreamQueue = new Communicator<>(slaveLooper, testLooper);
+    outStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
+    inStreamQueue = new Communicator<>(testLooper, slaveLooper);
+    inStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
+    inControlQueue = new Communicator<>(testLooper, slaveLooper);
+
+    Communicator<Metrics.MetricPublisherPublishMessage> slaveMetricsOut =
+        new Communicator<>(slaveLooper, testLooper);
+    slaveMetricsOut.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
+
+    slave = new Slave(slaveLooper, inStreamQueue, outStreamQueue, inControlQueue, slaveMetricsOut);
+    threadsPool = Executors.newSingleThreadExecutor();
+
+    threadsPool.execute(slave);
+  }
+
+  @After
+  public void after() throws Exception {
+    UnitTestHelper.clearSingletonRegistry();
+
+    tupleReceived = 0;
+    groupingInitInfo = null;
+
+    if (testLooper != null) {
+      testLooper.exitLoop();
+    }
+    if (slaveLooper != null) {
+      slaveLooper.exitLoop();
+    }
+    if (threadsPool != null) {
+      threadsPool.shutdownNow();
+    }
+    physicalPlan = null;
+    testLooper = null;
+    slaveLooper = null;
+    outStreamQueue = null;
+    inStreamQueue = null;
+
+    slave = null;
+    threadsPool = null;
+  }
+
+  protected String getInitInfoKey(String componentName) {
+    return "routing-init-info+" + componentName;
+  }
+
+  /**
+   * Test that tuple routing occurs using round robin
+   */
+  @Test
+  public void testRoundRobinRouting() throws Exception {
+    physicalPlan = constructPhysicalPlan();
+
+    PhysicalPlanHelper physicalPlanHelper =
+        new PhysicalPlanHelper(physicalPlan, getComponentToVerify().getInstanceId());
+    InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder()
+        .setNewPhysicalPlanHelper(physicalPlanHelper)
+        .build();
+
+    inControlQueue.offer(instanceControlMsg);
+
+    SingletonRegistry.INSTANCE.registerSingleton(
+        getInitInfoKey(getComponentToVerify().getName()), groupingInitInfo);
+
+    final int expectedTuplesValidated = 10;
+    Runnable task = new Runnable() {
+      @Override
+      public void run() {
+        for (int i = 0; i < Constants.RETRY_TIMES; i++) {
+          if (outStreamQueue.size() != 0) {
+            HeronTuples.HeronTupleSet set = outStreamQueue.poll();
+
+            assertTrue(set.isInitialized());
+            assertFalse(set.hasControl());
+            assertTrue(set.hasData());
+
+            HeronTuples.HeronDataTupleSet dataTupleSet = set.getData();
+            assertEquals(dataTupleSet.getStream().getId(), "default");
+            assertEquals(dataTupleSet.getStream().getComponentName(),
+                getComponentToVerify().getName());
+
+            for (HeronTuples.HeronDataTuple dataTuple : dataTupleSet.getTuplesList()) {
+              List<Integer> destTaskIds = dataTuple.getDestTaskIdsList();
+              assertEquals(1, destTaskIds.size());
+              assertEquals((Integer) tupleReceived, destTaskIds.get(0));
+              tupleReceived++;
+            }
+          }
+          if (tupleReceived == expectedTuplesValidated) {
+            assertEquals(getExpectedComponentInitInfo(), groupingInitInfo.toString());
+            testLooper.exitLoop();
+            break;
+          }
+          SysUtils.sleep(Constants.RETRY_INTERVAL_MS);
+        }
+      }
+    };
+
+    testLooper.addTasksOnWakeup(task);
+    testLooper.loop();
+    assertEquals(expectedTuplesValidated, tupleReceived);
+  }
+
+  private PhysicalPlans.PhysicalPlan constructPhysicalPlan() {
+    PhysicalPlans.PhysicalPlan.Builder physicalPlanBuilder
+        = PhysicalPlans.PhysicalPlan.newBuilder();
+
+    // Set topology protobuf
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+    initSpout(topologyBuilder, Component.SPOUT.getName());
+    initBoltA(topologyBuilder, Component.BOLT_A.getName(), Component.SPOUT.getName());
+    initBoltB(topologyBuilder, Component.BOLT_B.getName(), Component.BOLT_A.getName());
+
+    Config conf = new Config();
+    conf.setTeamEmail("some-team@company.com");
+    conf.setTeamName("some-team");
+    conf.setTopologyProjectName("heron-integration-test");
+    conf.setNumStmgrs(1);
+    conf.setMaxSpoutPending(100);
+    conf.setEnableAcking(false);
+
+    TopologyAPI.Topology topology = topologyBuilder.createTopology()
+        .setName("topology-name")
+        .setConfig(conf)
+        .setState(TopologyAPI.TopologyState.RUNNING)
+        .getTopology();
+
+    physicalPlanBuilder.setTopology(topology);
+
+    // Set instances
+    int taskId = 0;
+    for (Component component : Component.values()) {
+      addComponent(physicalPlanBuilder, component, taskId++);
+    }
+
+    // Set stream mgr
+    PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
+    stmgr.setId("stream-manager-id");
+    stmgr.setHostName("127.0.0.1");
+    stmgr.setDataPort(8888);
+    stmgr.setLocalEndpoint("endpoint");
+    physicalPlanBuilder.addStmgrs(stmgr);
+
+    return physicalPlanBuilder.build();
+  }
+
+  private void addComponent(PhysicalPlans.PhysicalPlan.Builder builder,
+                            Component component, int taskId) {
+    PhysicalPlans.InstanceInfo.Builder instanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
+    instanceInfo.setComponentName(component.getName());
+    instanceInfo.setTaskId(taskId);
+    instanceInfo.setComponentIndex(0);
+
+    PhysicalPlans.Instance.Builder instance = PhysicalPlans.Instance.newBuilder();
+    instance.setInstanceId(component.getInstanceId());
+    instance.setStmgrId("stream-manager-id");
+    instance.setInfo(instanceInfo);
+
+    builder.addInstances(instance);
+  }
+
+  protected void initSpout(TopologyBuilder topologyBuilder, String spoutId) {
+    topologyBuilder.setSpout(spoutId, new TestSpout(), 1);
+  }
+
+  protected void initBoltA(TopologyBuilder topologyBuilder,
+                         String boltId, String upstreamComponentId) {
+    topologyBuilder.setBolt(boltId, new TestBolt(), 1)
+        .shuffleGrouping(upstreamComponentId);
+  }
+
+  protected void initBoltB(TopologyBuilder topologyBuilder,
+                         String boltId, String upstreamComponentId) {
+    topologyBuilder.setBolt(boltId, new TestBolt(), 1)
+        .shuffleGrouping(upstreamComponentId);
+  }
+
+  protected abstract Component getComponentToVerify();
+
+  protected abstract String getExpectedComponentInitInfo();
+}
diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java b/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java
index 82099e4..515fee4 100644
--- a/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java
+++ b/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java
@@ -1,257 +1,74 @@
-// Copyright 2016 Twitter. All rights reserved.
+//  Copyright 2017 Twitter. All rights reserved.
 //
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
 //
-//    http://www.apache.org/licenses/LICENSE-2.0
+//  http://www.apache.org/licenses/LICENSE-2.0
 //
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
 package com.twitter.heron.grouping;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.api.grouping.CustomStreamGrouping;
 import com.twitter.heron.api.topology.TopologyBuilder;
 import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.common.basics.Communicator;
 import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.basics.SlaveLooper;
-import com.twitter.heron.common.basics.SysUtils;
-import com.twitter.heron.common.basics.WakeableLooper;
-import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
-import com.twitter.heron.instance.InstanceControlMsg;
-import com.twitter.heron.instance.Slave;
-import com.twitter.heron.proto.system.HeronTuples;
-import com.twitter.heron.proto.system.Metrics;
-import com.twitter.heron.proto.system.PhysicalPlans;
-import com.twitter.heron.resource.Constants;
 import com.twitter.heron.resource.TestBolt;
-import com.twitter.heron.resource.TestSpout;
-import com.twitter.heron.resource.UnitTestHelper;
 
-public class CustomGroupingTest {
-  private static final String SPOUT_INSTANCE_ID = "spout-id";
-  private static final String CUSTOM_GROUPING_INFO = "custom-grouping-info-in-prepare";
+/**
+ * Tests custom grouping by using round robin grouping from SPOUT to BOLT_A
+ */
+public class CustomGroupingTest extends AbstractTupleRoutingTest {
 
-  private WakeableLooper testLooper;
-  private SlaveLooper slaveLooper;
-  private PhysicalPlans.PhysicalPlan physicalPlan;
-  private Communicator<HeronTuples.HeronTupleSet> outStreamQueue;
-  private Communicator<HeronTuples.HeronTupleSet> inStreamQueue;
-  private Communicator<InstanceControlMsg> inControlQueue;
-  private ExecutorService threadsPool;
-  private Communicator<Metrics.MetricPublisherPublishMessage> slaveMetricsOut;
-  private volatile int tupleReceived;
-  private volatile StringBuilder customGroupingInfoInPrepare;
-  private Slave slave;
+  @Override
+  protected void initBoltA(TopologyBuilder topologyBuilder,
+                           String boltId, String upstreamComponentId) {
+    final CustomStreamGrouping myCustomGrouping =
+        new MyRoundRobinCustomGrouping(getInitInfoKey(upstreamComponentId));
 
-  @Before
-  public void before() throws Exception {
-    UnitTestHelper.addSystemConfigToSingleton();
-
-    tupleReceived = 0;
-    customGroupingInfoInPrepare = new StringBuilder();
-
-    testLooper = new SlaveLooper();
-    slaveLooper = new SlaveLooper();
-    outStreamQueue = new Communicator<HeronTuples.HeronTupleSet>(slaveLooper, testLooper);
-    outStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
-    inStreamQueue = new Communicator<HeronTuples.HeronTupleSet>(testLooper, slaveLooper);
-    inStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
-    inControlQueue = new Communicator<InstanceControlMsg>(testLooper, slaveLooper);
-
-    slaveMetricsOut =
-        new Communicator<Metrics.MetricPublisherPublishMessage>(slaveLooper, testLooper);
-    slaveMetricsOut.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
-
-    slave = new Slave(slaveLooper, inStreamQueue, outStreamQueue, inControlQueue, slaveMetricsOut);
-    threadsPool = Executors.newSingleThreadExecutor();
-
-    threadsPool.execute(slave);
+    topologyBuilder.setBolt(boltId, new TestBolt(), 1)
+        .customGrouping(upstreamComponentId, myCustomGrouping);
   }
 
-  @After
-  public void after() throws Exception {
-    UnitTestHelper.clearSingletonRegistry();
-
-    tupleReceived = 0;
-    customGroupingInfoInPrepare = null;
-
-    if (testLooper != null) {
-      testLooper.exitLoop();
-    }
-    if (slaveLooper != null) {
-      slaveLooper.exitLoop();
-    }
-    if (threadsPool != null) {
-      threadsPool.shutdownNow();
-    }
-    physicalPlan = null;
-    testLooper = null;
-    slaveLooper = null;
-    outStreamQueue = null;
-    inStreamQueue = null;
-
-    slave = null;
-    threadsPool = null;
+  @Override
+  protected Component getComponentToVerify() {
+    return Component.SPOUT;
   }
 
-  /**
-   * Test custom grouping
-   */
-  @Test
-  public void testCustomGrouping() throws Exception {
-    final MyCustomGrouping myCustomGrouping = new MyCustomGrouping();
-    final String expectedCustomGroupingStringInPrepare = "test-spout+test-spout+default+[1]";
-
-    physicalPlan = constructPhysicalPlan(myCustomGrouping);
-
-    PhysicalPlanHelper physicalPlanHelper = new PhysicalPlanHelper(physicalPlan, SPOUT_INSTANCE_ID);
-    InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder().
-        setNewPhysicalPlanHelper(physicalPlanHelper).
-        build();
-
-    inControlQueue.offer(instanceControlMsg);
-
-    SingletonRegistry.INSTANCE.registerSingleton(CUSTOM_GROUPING_INFO, customGroupingInfoInPrepare);
-
-    Runnable task = new Runnable() {
-      @Override
-      public void run() {
-        for (int i = 0; i < Constants.RETRY_TIMES; i++) {
-          if (outStreamQueue.size() != 0) {
-            HeronTuples.HeronTupleSet set = outStreamQueue.poll();
-
-            Assert.assertTrue(set.isInitialized());
-            Assert.assertFalse(set.hasControl());
-            Assert.assertTrue(set.hasData());
-
-            HeronTuples.HeronDataTupleSet dataTupleSet = set.getData();
-            Assert.assertEquals(dataTupleSet.getStream().getId(), "default");
-            Assert.assertEquals(dataTupleSet.getStream().getComponentName(), "test-spout");
-
-            for (HeronTuples.HeronDataTuple dataTuple : dataTupleSet.getTuplesList()) {
-              List<Integer> destTaskIds = dataTuple.getDestTaskIdsList();
-              Assert.assertEquals(destTaskIds.size(), 1);
-              Assert.assertEquals(destTaskIds.get(0), (Integer) tupleReceived);
-              tupleReceived++;
-            }
-          }
-          if (tupleReceived == 10) {
-            Assert.assertEquals(expectedCustomGroupingStringInPrepare,
-                customGroupingInfoInPrepare.toString());
-            testLooper.exitLoop();
-            break;
-          }
-          SysUtils.sleep(Constants.RETRY_INTERVAL_MS);
-        }
-      }
-    };
-
-    testLooper.addTasksOnWakeup(task);
-    testLooper.loop();
+  @Override
+  protected String getExpectedComponentInitInfo() {
+    return "test-spout+test-spout+default+[1]";
   }
 
-  private PhysicalPlans.PhysicalPlan constructPhysicalPlan(MyCustomGrouping myCustomGrouping) {
-    PhysicalPlans.PhysicalPlan.Builder pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
-
-    // Set topology protobuf
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-    topologyBuilder.setSpout("test-spout", new TestSpout(), 1);
-    // Here we need case switch to corresponding grouping
-    topologyBuilder.setBolt("test-bolt", new TestBolt(), 1).
-        customGrouping("test-spout", myCustomGrouping);
-
-    Config conf = new Config();
-    conf.setTeamEmail("streaming-compute@twitter.com");
-    conf.setTeamName("stream-computing");
-    conf.setTopologyProjectName("heron-integration-test");
-    conf.setNumStmgrs(1);
-    conf.setMaxSpoutPending(100);
-    conf.setEnableAcking(false);
-
-    TopologyAPI.Topology fTopology =
-        topologyBuilder.createTopology().
-            setName("topology-name").
-            setConfig(conf).
-            setState(TopologyAPI.TopologyState.RUNNING).
-            getTopology();
-
-    pPlan.setTopology(fTopology);
-
-    // Set instances
-    // Construct the spoutInstance
-    PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
-    spoutInstanceInfo.setComponentName("test-spout");
-    spoutInstanceInfo.setTaskId(0);
-    spoutInstanceInfo.setComponentIndex(0);
-
-    PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder();
-    spoutInstance.setInstanceId("spout-id");
-    spoutInstance.setStmgrId("stream-manager-id");
-    spoutInstance.setInfo(spoutInstanceInfo);
-
-    // Construct the boltInstanceInfo
-    PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
-    boltInstanceInfo.setComponentName("test-bolt");
-    boltInstanceInfo.setTaskId(1);
-    boltInstanceInfo.setComponentIndex(0);
-
-    PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder();
-    boltInstance.setInstanceId("bolt-id");
-    boltInstance.setStmgrId("stream-manager-id");
-    boltInstance.setInfo(boltInstanceInfo);
-
-    pPlan.addInstances(spoutInstance);
-    pPlan.addInstances(boltInstance);
-
-    // Set stream mgr
-    PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
-    stmgr.setId("stream-manager-id");
-    stmgr.setHostName("127.0.0.1");
-    stmgr.setDataPort(8888);
-    stmgr.setLocalEndpoint("endpoint");
-    pPlan.addStmgrs(stmgr);
-
-    return pPlan.build();
-  }
-
-  private static class MyCustomGrouping implements CustomStreamGrouping {
+  private static final class MyRoundRobinCustomGrouping implements CustomStreamGrouping {
     private static final long serialVersionUID = -4141962710451507976L;
     private volatile int emitted = 0;
+    private final String initInfoKey;
+
+    private MyRoundRobinCustomGrouping(String initInfoKey) {
+      super();
+      this.initInfoKey = initInfoKey;
+    }
 
     @Override
-    public void prepare(
-        TopologyContext context,
-        String component,
-        String streamId,
-        List<Integer> targetTasks) {
+    public void prepare(TopologyContext context, String component,
+                        String streamId, List<Integer> targetTasks) {
 
-      StringBuilder customGroupingInfoInPrepare =
-          (StringBuilder) SingletonRegistry.INSTANCE.getSingleton(CUSTOM_GROUPING_INFO);
-      customGroupingInfoInPrepare.append(context.getThisComponentId() + "+" + component
-          + "+" + streamId + "+" + targetTasks.toString());
+      ((StringBuilder) SingletonRegistry.INSTANCE.getSingleton(initInfoKey))
+          .append(String.format("%s+%s+%s+%s",
+              context.getThisComponentId(), component, streamId, targetTasks.toString()));
     }
 
     @Override
     public List<Integer> chooseTasks(List<Object> values) {
-      List<Integer> res = new ArrayList<Integer>();
+      List<Integer> res = new ArrayList<>();
       res.add(emitted);
       emitted++;
       return res;
diff --git a/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java b/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java
index 172dd35..3ce2919 100644
--- a/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java
+++ b/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java
@@ -30,7 +30,7 @@
 
 import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.serializer.KryoSerializer;
+import com.twitter.heron.api.serializer.JavaSerializer;
 import com.twitter.heron.common.basics.Communicator;
 import com.twitter.heron.common.basics.SingletonRegistry;
 import com.twitter.heron.common.basics.SlaveLooper;
@@ -81,7 +81,7 @@
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    serializer = new KryoSerializer();
+    serializer = new JavaSerializer();
     serializer.initialize(null);
   }
 
diff --git a/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java b/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java
index 5d46514..1c0794f 100644
--- a/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java
+++ b/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java
@@ -32,7 +32,7 @@
 import org.junit.Test;
 
 import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.serializer.KryoSerializer;
+import com.twitter.heron.api.serializer.JavaSerializer;
 import com.twitter.heron.common.basics.Communicator;
 import com.twitter.heron.common.basics.SingletonRegistry;
 import com.twitter.heron.common.basics.SlaveLooper;
@@ -95,7 +95,7 @@
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    serializer = new KryoSerializer();
+    serializer = new JavaSerializer();
     serializer.initialize(null);
   }
 
diff --git a/heron/metricscachemgr/src/java/BUILD b/heron/metricscachemgr/src/java/BUILD
index 31664db..dd031be 100644
--- a/heron/metricscachemgr/src/java/BUILD
+++ b/heron/metricscachemgr/src/java/BUILD
@@ -13,7 +13,7 @@
            "//heron/api/src/java:api-java",
            "//heron/common/src/java:common-java",
            "//heron/scheduler-core/src/java:scheduler-java",
-           "//heron/statemgrs/src/java:localfs-statemgr-java",
+           "//heron/statemgrs/src/java:statemgrs-java",
            "//heron/metricsmgr/src/thrift:thrift_scribe_java",
            "//heron/metricsmgr/src/java:metricsmgr-java",
            "//third_party/java:cli",
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
index 7e3bfd5..f608439 100644
--- a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
@@ -158,6 +158,12 @@
         any(String.class), eq(TOPOLOGY_NAME),
         eq(mockStateMgr), any(NetworkUtils.TunnelConfig.class));
 
+    // reactivation won't happen since topology state is still running due to mock state manager
+    PowerMockito.doNothing().when(TMasterUtils.class, "transitionTopologyState",
+        eq(TOPOLOGY_NAME), eq(TMasterUtils.TMasterCommand.ACTIVATE), eq(mockStateMgr),
+        eq(TopologyAPI.TopologyState.PAUSED), eq(TopologyAPI.TopologyState.RUNNING),
+        any(NetworkUtils.TunnelConfig.class));
+
     spyUpdateManager.updateTopology(currentProtoPlan, proposedProtoPlan);
 
     verify(spyUpdateManager).deactivateTopology(eq(mockStateMgr), eq(testTopology));
diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java b/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java
index 9f5b49c..fc4fd57 100644
--- a/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java
+++ b/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java
@@ -14,24 +14,11 @@
 
 package com.twitter.heron.simulator.instance;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
 import com.twitter.heron.api.bolt.IOutputCollector;
 import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.tuple.Tuple;
 import com.twitter.heron.common.basics.Communicator;
 import com.twitter.heron.common.utils.metrics.BoltMetrics;
 import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
-import com.twitter.heron.common.utils.tuple.TupleImpl;
 import com.twitter.heron.proto.system.HeronTuples;
 
 /**
@@ -52,220 +39,14 @@
  * 2. Pack the tuple and submit the OutgoingTupleCollection's addDataTuple
  * 3. Update the metrics
  */
-public class BoltOutputCollectorImpl implements IOutputCollector {
-  private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
-
-  private final IPluggableSerializer serializer;
-  private final OutgoingTupleCollection outputter;
-
-  // Reference to update the bolt metrics
-  private final BoltMetrics boltMetrics;
-  private PhysicalPlanHelper helper;
-
-  private final boolean ackEnabled;
+public class BoltOutputCollectorImpl
+    extends com.twitter.heron.instance.bolt.BoltOutputCollectorImpl
+    implements IOutputCollector {
 
   public BoltOutputCollectorImpl(IPluggableSerializer serializer,
                                  PhysicalPlanHelper helper,
                                  Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
                                  BoltMetrics boltMetrics) {
-
-    if (helper.getMyBolt() == null) {
-      throw new RuntimeException(helper.getMyTaskId() + " is not a bolt ");
-    }
-
-    this.serializer = serializer;
-    this.boltMetrics = boltMetrics;
-    updatePhysicalPlanHelper(helper);
-
-    Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
-    if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
-        && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
-      this.ackEnabled = Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
-    } else {
-      this.ackEnabled = false;
-    }
-
-    this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
-  }
-
-  public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
-    this.helper = physicalPlanHelper;
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following public methods are overrides OutputCollector
-  /////////////////////////////////////////////////////////
-
-  @Override
-  public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-    return admitBoltTuple(streamId, anchors, tuple);
-  }
-
-  @Override
-  public void emitDirect(
-      int taskId,
-      String streamId,
-      Collection<Tuple> anchors,
-      List<Object> tuple) {
-    throw new RuntimeException("emitDirect not supported");
-  }
-
-  @Override
-  public void reportError(Throwable error) {
-    LOG.log(Level.SEVERE, "Reporting an error in topology code ", error);
-  }
-
-  @Override
-  public void ack(Tuple input) {
-    admitAckTuple(input);
-  }
-
-  @Override
-  public void fail(Tuple input) {
-    admitFailTuple(input);
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following public methods are used for querying or
-  // interacting internal state of the BoltOutputCollectorImpl
-  /////////////////////////////////////////////////////////
-
-  // Return true we could offer item to outQueue
-  public boolean isOutQueuesAvailable() {
-    return outputter.isOutQueuesAvailable();
-  }
-
-  // Return the total data emitted in bytes
-  public long getTotalDataEmittedInBytes() {
-    return outputter.getTotalDataEmittedInBytes();
-  }
-
-  // Flush the tuples to next stage
-  public void sendOutTuples() {
-    outputter.sendOutTuples();
-  }
-
-  // Clean the internal state of BoltOutputCollectorImpl
-  public void clear() {
-    outputter.clear();
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following private methods are internal implementations
-  /////////////////////////////////////////////////////////
-
-  private List<Integer> admitBoltTuple(
-      String streamId,
-      Collection<Tuple> anchors,
-      List<Object> tuple) {
-    // First check whether this tuple is sane
-    helper.checkOutputSchema(streamId, tuple);
-
-    // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
-    List<Integer> customGroupingTargetTaskIds =
-        helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
-    // Invoke user-defined emit task hook
-    helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
-
-    // Start construct the data tuple
-    HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
-    // set the key. This is mostly ignored
-    bldr.setKey(0);
-
-    if (customGroupingTargetTaskIds != null) {
-      // It is a CustomStreamGrouping
-      for (Integer taskId : customGroupingTargetTaskIds) {
-        bldr.addDestTaskIds(taskId);
-      }
-    }
-
-    // Set the anchors for a tuple
-    if (anchors != null) {
-      // This message is rooted
-      Set<HeronTuples.RootId> mergedRoots = new HashSet<HeronTuples.RootId>();
-      for (Tuple tpl : anchors) {
-        if (tpl instanceof TupleImpl) {
-          TupleImpl t = (TupleImpl) tpl;
-          mergedRoots.addAll(t.getRoots());
-        }
-      }
-      for (HeronTuples.RootId rt : mergedRoots) {
-        bldr.addRoots(rt);
-      }
-    }
-
-    long tupleSizeInBytes = 0;
-
-    long startTime = System.nanoTime();
-
-    // Serialize it
-    for (Object obj : tuple) {
-      byte[] b = serializer.serialize(obj);
-      ByteString bstr = ByteString.copyFrom(b);
-      bldr.addValues(bstr);
-      tupleSizeInBytes += b.length;
-    }
-
-    long latency = System.nanoTime() - startTime;
-    boltMetrics.serializeDataTuple(streamId, latency);
-
-    // submit to outputter
-    outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
-
-    // Update metrics
-    boltMetrics.emittedTuple(streamId);
-
-    // TODO:- remove this after changing the api
-    return null;
-  }
-
-  private void admitAckTuple(Tuple tuple) {
-    if (tuple instanceof TupleImpl) {
-      TupleImpl tuplImpl = (TupleImpl) tuple;
-      if (ackEnabled) {
-        HeronTuples.AckTuple.Builder bldr = HeronTuples.AckTuple.newBuilder();
-        bldr.setAckedtuple(tuplImpl.getTupleKey());
-
-        long tupleSizeInBytes = 0;
-
-        for (HeronTuples.RootId rt : tuplImpl.getRoots()) {
-          bldr.addRoots(rt);
-          tupleSizeInBytes += rt.getSerializedSize();
-        }
-        outputter.addAckTuple(bldr, tupleSizeInBytes);
-      }
-      long latency = System.nanoTime() - tuplImpl.getCreationTime();
-
-      // Invoke user-defined boltAck task hook
-      helper.getTopologyContext().invokeHookBoltAck(tuple, latency);
-
-      boltMetrics.ackedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
-    }
-  }
-
-  private void admitFailTuple(Tuple tuple) {
-    if (tuple instanceof TupleImpl) {
-      TupleImpl tuplImpl = (TupleImpl) tuple;
-      if (ackEnabled) {
-        HeronTuples.AckTuple.Builder bldr = HeronTuples.AckTuple.newBuilder();
-        bldr.setAckedtuple(tuplImpl.getTupleKey());
-
-        long tupleSizeInBytes = 0;
-
-        for (HeronTuples.RootId rt : tuplImpl.getRoots()) {
-          bldr.addRoots(rt);
-          tupleSizeInBytes += rt.getSerializedSize();
-        }
-        outputter.addFailTuple(bldr, tupleSizeInBytes);
-      }
-      long latency = System.nanoTime() - tuplImpl.getCreationTime();
-
-      // Invoke user-defined boltFail task hook
-      helper.getTopologyContext().invokeHookBoltFail(tuple, latency);
-
-      boltMetrics.failedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
-    }
+    super(serializer, helper, streamOutQueue, boltMetrics);
   }
 }
diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java b/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java
index f34d9c3..394f29d 100644
--- a/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java
+++ b/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java
@@ -14,10 +14,7 @@
 
 package com.twitter.heron.simulator.instance;
 
-import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.common.basics.Communicator;
-import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.config.SystemConfig;
 import com.twitter.heron.proto.system.HeronTuples;
 
 /**
@@ -28,144 +25,10 @@
  * <p>
  * In fact, when talking about to send out tuples, we mean we push them to the out queues.
  */
-public class OutgoingTupleCollection {
-  protected final String componentName;
-  // We have just one outQueue responsible for both control tuples and data tuples
-  private final Communicator<HeronTuples.HeronTupleSet> outQueue;
-  private final SystemConfig systemConfig;
+public class OutgoingTupleCollection extends com.twitter.heron.instance.OutgoingTupleCollection {
 
-  private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
-  private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
-
-  // Total data emitted in bytes for the entire life
-  private long totalDataEmittedInBytes;
-
-  // Current size in bytes for data types to pack into the HeronTupleSet
-  private long currentDataTupleSizeInBytes;
-  // Maximum data tuple size in bytes we can put in one HeronTupleSet
-  private long maxDataTupleSizeInBytes;
-
-  private int dataTupleSetCapacity;
-  private int controlTupleSetCapacity;
-
-  public OutgoingTupleCollection(
-      String componentName,
-      Communicator<HeronTuples.HeronTupleSet> outQueue) {
-    this.outQueue = outQueue;
-    this.componentName = componentName;
-    this.systemConfig =
-        (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
-
-    // Initialize the values in constructor
-    this.totalDataEmittedInBytes = 0;
-    this.currentDataTupleSizeInBytes = 0;
-
-    // Read the config values
-    this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
-    this.maxDataTupleSizeInBytes = systemConfig.getInstanceSetDataTupleSizeBytes();
-    this.controlTupleSetCapacity = systemConfig.getInstanceSetControlTupleCapacity();
-  }
-
-  public void sendOutTuples() {
-    flushRemaining();
-  }
-
-  public void addDataTuple(
-      String streamId,
-      HeronTuples.HeronDataTuple.Builder newTuple,
-      long tupleSizeInBytes) {
-    if (currentDataTuple == null
-        || !currentDataTuple.getStream().getId().equals(streamId)
-        || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
-        || currentDataTupleSizeInBytes >= maxDataTupleSizeInBytes) {
-      initNewDataTuple(streamId);
-    }
-    currentDataTuple.addTuples(newTuple);
-
-    currentDataTupleSizeInBytes += tupleSizeInBytes;
-    totalDataEmittedInBytes += tupleSizeInBytes;
-  }
-
-  public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
-    if (currentControlTuple == null
-        || currentControlTuple.getFailsCount() > 0
-        || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
-      initNewControlTuple();
-    }
-    currentControlTuple.addAcks(newTuple);
-
-    // Add the size of data in bytes ready to send out
-    totalDataEmittedInBytes += tupleSizeInBytes;
-  }
-
-  public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
-    if (currentControlTuple == null
-        || currentControlTuple.getAcksCount() > 0
-        || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
-      initNewControlTuple();
-    }
-    currentControlTuple.addFails(newTuple);
-
-    // Add the size of data in bytes ready to send out
-    totalDataEmittedInBytes += tupleSizeInBytes;
-  }
-
-  private void initNewDataTuple(String streamId) {
-    flushRemaining();
-
-    // Reset the set for data tuple
-    currentDataTupleSizeInBytes = 0;
-
-    TopologyAPI.StreamId.Builder sbldr = TopologyAPI.StreamId.newBuilder();
-    sbldr.setId(streamId);
-    sbldr.setComponentName(componentName);
-    currentDataTuple = HeronTuples.HeronDataTupleSet.newBuilder();
-    currentDataTuple.setStream(sbldr);
-  }
-
-  private void initNewControlTuple() {
-    flushRemaining();
-    currentControlTuple = HeronTuples.HeronControlTupleSet.newBuilder();
-  }
-
-  private void flushRemaining() {
-    if (currentDataTuple != null) {
-      HeronTuples.HeronTupleSet.Builder bldr = HeronTuples.HeronTupleSet.newBuilder();
-      bldr.setData(currentDataTuple);
-
-      pushTupleToQueue(bldr, outQueue);
-
-      currentDataTuple = null;
-    }
-    if (currentControlTuple != null) {
-      HeronTuples.HeronTupleSet.Builder bldr = HeronTuples.HeronTupleSet.newBuilder();
-      bldr.setControl(currentControlTuple);
-      pushTupleToQueue(bldr, outQueue);
-
-      currentControlTuple = null;
-    }
-  }
-
-  private void pushTupleToQueue(HeronTuples.HeronTupleSet.Builder bldr,
-                                Communicator<HeronTuples.HeronTupleSet> out) {
-    // The Communicator has un-bounded capacity so the offer will always be successful
-    out.offer(bldr.build());
-  }
-
-  // Return true we could offer item to outQueue
-  public boolean isOutQueuesAvailable() {
-    return outQueue.size() < outQueue.getExpectedAvailableCapacity();
-  }
-
-  public long getTotalDataEmittedInBytes() {
-    return totalDataEmittedInBytes;
-  }
-
-  // Clean the internal state of OutgoingTupleCollection
-  public void clear() {
-    currentControlTuple = null;
-    currentDataTuple = null;
-
-    outQueue.clear();
+  public OutgoingTupleCollection(String componentName,
+                                 Communicator<HeronTuples.HeronTupleSet> outQueue) {
+    super(componentName, outQueue);
   }
 }
diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java b/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java
index 9867576..4619ff4 100644
--- a/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java
+++ b/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java
@@ -14,25 +14,11 @@
 
 package com.twitter.heron.simulator.instance;
 
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
 import com.twitter.heron.api.serializer.IPluggableSerializer;
 import com.twitter.heron.api.spout.ISpoutOutputCollector;
 import com.twitter.heron.common.basics.Communicator;
-import com.twitter.heron.common.utils.metrics.SpoutMetrics;
+import com.twitter.heron.common.utils.metrics.ComponentMetrics;
 import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
-import com.twitter.heron.common.utils.misc.TupleKeyGenerator;
 import com.twitter.heron.proto.system.HeronTuples;
 
 /**
@@ -48,219 +34,14 @@
  * 3. Maintain some statistics, for instance, total tuples emitted.
  * <p>
  */
-public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
-  private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
-
-  // Map from tuple key to composite object with insertion-order, i.e. ordered by time
-  private final LinkedHashMap<Long, RootTupleInfo> inFlightTuples;
-
-  private final TupleKeyGenerator keyGenerator;
-
-  private final SpoutMetrics spoutMetrics;
-  private PhysicalPlanHelper helper;
-
-  private final boolean ackingEnabled;
-  // When acking is not enabled, if the spout does an emit with a anchor
-  // we need to ack it immediately. This keeps the list of those
-  private final Queue<RootTupleInfo> immediateAcks;
-
-  private final IPluggableSerializer serializer;
-  private final OutgoingTupleCollection outputter;
-
-  private long totalTuplesEmitted;
+public class SpoutOutputCollectorImpl
+    extends com.twitter.heron.instance.spout.SpoutOutputCollectorImpl
+    implements ISpoutOutputCollector {
 
   public SpoutOutputCollectorImpl(IPluggableSerializer serializer,
                                   PhysicalPlanHelper helper,
                                   Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
-                                  SpoutMetrics spoutMetrics) {
-    if (helper.getMySpout() == null) {
-      throw new RuntimeException(helper.getMyTaskId() + " is not a spout ");
-    }
-
-    this.serializer = serializer;
-    this.helper = helper;
-    this.spoutMetrics = spoutMetrics;
-    this.keyGenerator = new TupleKeyGenerator();
-    updatePhysicalPlanHelper(helper);
-
-    // with default capacity, load factor and insertion order
-    inFlightTuples = new LinkedHashMap<Long, RootTupleInfo>();
-
-    Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
-    if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
-        && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
-      this.ackingEnabled =
-          Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
-    } else {
-      this.ackingEnabled = false;
-    }
-
-    if (!ackingEnabled) {
-      immediateAcks = new ArrayDeque<RootTupleInfo>();
-    } else {
-      immediateAcks = null;
-    }
-
-    this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
-  }
-
-  public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
-    this.helper = physicalPlanHelper;
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following public methods are overrides OutputCollector
-  /////////////////////////////////////////////////////////
-
-  @Override
-  public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-    return admitSpoutTuple(streamId, tuple, messageId);
-  }
-
-  @Override
-  public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-    throw new RuntimeException("emitDirect Not implemented");
-  }
-
-  // Log the report error and also send the stack trace to metrics manager.
-  @Override
-  public void reportError(Throwable error) {
-    LOG.log(Level.SEVERE, "Reporting an error in topology code ", error);
-  }
-
-
-  /////////////////////////////////////////////////////////
-  // Following public methods are used for querying or
-  // interacting internal state of the BoltOutputCollectorImpl
-  /////////////////////////////////////////////////////////
-
-  // Return true we could offer item to outQueue
-  public boolean isOutQueuesAvailable() {
-    return outputter.isOutQueuesAvailable();
-  }
-
-  // Return the total data emitted in bytes
-  public long getTotalDataEmittedInBytes() {
-    return outputter.getTotalDataEmittedInBytes();
-  }
-
-  // Flush the tuples to next stage
-  public void sendOutTuples() {
-    outputter.sendOutTuples();
-  }
-
-  public long getTotalTuplesEmitted() {
-    return totalTuplesEmitted;
-  }
-
-  public int numInFlight() {
-    return inFlightTuples.size();
-  }
-
-  public Queue<RootTupleInfo> getImmediateAcks() {
-    return immediateAcks;
-  }
-
-  public RootTupleInfo retireInFlight(long rootId) {
-    return inFlightTuples.remove(rootId);
-  }
-
-  public List<RootTupleInfo> retireExpired(long timeout) {
-    List<RootTupleInfo> retval = new ArrayList<RootTupleInfo>();
-    long curTime = System.nanoTime();
-
-    // The LinkedHashMap is ordered by insertion order, i.e. ordered by time
-    // So we want need to iterate from the start and remove all items until
-    // we meet the RootTupleInfo no need to expire
-    Iterator<RootTupleInfo> iterator = inFlightTuples.values().iterator();
-    while (iterator.hasNext()) {
-      RootTupleInfo rootTupleInfo = iterator.next();
-      if (rootTupleInfo.isExpired(curTime, timeout)) {
-        retval.add(rootTupleInfo);
-        iterator.remove();
-      } else {
-        break;
-      }
-    }
-
-    return retval;
-  }
-
-  // Clean the internal state of BoltOutputCollectorImpl
-  public void clear() {
-    outputter.clear();
-  }
-
-  /////////////////////////////////////////////////////////
-  // Following private methods are internal implementations
-  /////////////////////////////////////////////////////////
-
-  private List<Integer> admitSpoutTuple(String streamId, List<Object> tuple, Object messageId) {
-    // First check whether this tuple is sane
-    helper.checkOutputSchema(streamId, tuple);
-
-    // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
-    List<Integer> customGroupingTargetTaskIds =
-        helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
-    // Invoke user-defined emit task hook
-    helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
-
-    // Start construct the data tuple
-    HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
-    // set the key. This is mostly ignored
-    bldr.setKey(0);
-
-    if (customGroupingTargetTaskIds != null) {
-      // It is a CustomStreamGrouping
-      for (Integer taskId : customGroupingTargetTaskIds) {
-        bldr.addDestTaskIds(taskId);
-      }
-    }
-
-    if (messageId != null) {
-      RootTupleInfo tupleInfo = new RootTupleInfo(streamId, messageId);
-      if (ackingEnabled) {
-        // This message is rooted
-        HeronTuples.RootId.Builder rtbldr = EstablishRootId(tupleInfo);
-        bldr.addRoots(rtbldr);
-      } else {
-        immediateAcks.offer(tupleInfo);
-      }
-    }
-
-    long tupleSizeInBytes = 0;
-
-    long startTime = System.nanoTime();
-
-    // Serialize it
-    for (Object obj : tuple) {
-      byte[] b = serializer.serialize(obj);
-      ByteString bstr = ByteString.copyFrom(b);
-      bldr.addValues(bstr);
-      tupleSizeInBytes += b.length;
-    }
-
-    long latency = System.nanoTime() - startTime;
-    spoutMetrics.serializeDataTuple(streamId, latency);
-
-    // submit to outputter
-    outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
-    totalTuplesEmitted++;
-    spoutMetrics.emittedTuple(streamId);
-
-    // TODO:- remove this after changing the api
-    return null;
-  }
-
-  private HeronTuples.RootId.Builder EstablishRootId(RootTupleInfo tupleInfo) {
-    // This message is rooted
-    long rootId = keyGenerator.next();
-    HeronTuples.RootId.Builder rtbldr = HeronTuples.RootId.newBuilder();
-    rtbldr.setTaskid(helper.getMyTaskId());
-    rtbldr.setKey(rootId);
-    inFlightTuples.put(rootId, tupleInfo);
-    return rtbldr;
+                                  ComponentMetrics spoutMetrics) {
+    super(serializer, helper, streamOutQueue, spoutMetrics);
   }
 }
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java
index 160e990..c81b93c 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java
@@ -130,7 +130,7 @@
     final StringBuilder builder = outputBuilder == null ? new StringBuilder() : outputBuilder;
 
     // Log the command for debugging
-    LOG.log(Level.INFO, "Running synced process: ``{0}''''", String.join(" ", cmdline));
+    LOG.log(Level.INFO, "Running synced process: ``{0}''''", joinString(cmdline));
     ProcessBuilder pb = getProcessBuilder(isInheritIO, cmdline, workingDirectory, envs);
     /* combine input stream and error stream into stderr because
        1. this preserves order of process's stdout/stderr message
@@ -195,7 +195,7 @@
 
   private static Process runASyncProcess(String[] command, File workingDirectory,
       Map<String, String> envs, String logFileUuid, boolean logStderr) {
-    LOG.log(Level.INFO, "Running async process: ``{0}''''", String.join(" ", command));
+    LOG.log(Level.INFO, "Running async process: ``{0}''''", joinString(command));
 
     // the log file can help people to find out what happened between pb.start()
     // and the async process started
@@ -315,4 +315,15 @@
     return ret == 0;
   }
 
+  // java 7 compatible version of String.join(" ", array), available in java 8
+  private static String joinString(String[] array) {
+    StringBuilder sb = new StringBuilder();
+    for (String value : array) {
+      if (sb.length() > 0) {
+        sb.append(" ");
+      }
+      sb.append(value);
+    }
+    return sb.toString();
+  }
 }
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java
index 3102153..8de6623 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java
@@ -136,9 +136,10 @@
     }
 
     if (state == expectedState) {
-      throw new TMasterException(String.format(
-          "Topology %s command received topology '%s' but already in state %s",
+      LOG.warning(String.format(
+          "Topology %s command received but topology '%s' already in state %s",
           topologyStateControlCommand, topologyName, state));
+      return;
     }
 
     if (state != startState) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index fcba33d..1ff249f 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -238,11 +238,27 @@
     sp_string instance_id = instance_info_[task_id]->instance_->instance_id();
     LOG(INFO) << "Instance " << instance_id << " closed connection";
 
-    instance_info_[task_id]->set_connection(NULL);
+    // Remove the connection from active instances
     active_instances_.erase(_conn);
 
+    // Remove from instance info
+    instance_info_[task_id]->set_connection(NULL);
+    delete instance_info_[task_id];
+    instance_info_.erase(task_id);
+
+    // Clean the instance_metric_map
+    auto immiter = instance_metric_map_.find(instance_id);
+    if (immiter != instance_metric_map_.end()) {
+      metrics_manager_client_->unregister_metric(MakeBackPressureCompIdMetricName(instance_id));
+      delete instance_metric_map_[instance_id];
+      instance_metric_map_.erase(instance_id);
+    }
+
+    // Clean the connection_buffer_metric_map_
     auto qmmiter = connection_buffer_metric_map_.find(instance_id);
     if (qmmiter != connection_buffer_metric_map_.end()) {
+      metrics_manager_client_->unregister_metric(MakeQueueSizeCompIdMetricName(instance_id));
+      delete connection_buffer_metric_map_[instance_id];
       connection_buffer_metric_map_.erase(instance_id);
     }
   }
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 6a30dee..ee6a48a 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -139,7 +139,6 @@
   typedef std::unordered_map<Connection*, sp_int32> ConnectionTaskIdMap;
   ConnectionTaskIdMap active_instances_;
   // map of task id to InstanceData
-  // Once populated, will not change
   typedef std::unordered_map<sp_int32, InstanceData*> TaskIdInstanceDataMap;
   TaskIdInstanceDataMap instance_info_;
 
diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD
index 93f6526..c9ed8e2 100644
--- a/heron/storm/src/java/BUILD
+++ b/heron/storm/src/java/BUILD
@@ -1,28 +1,31 @@
 package(default_visibility = ["//visibility:public"])
 
+load("/tools/rules/jarjar_rules", "jarjar_binary")
+
 storm_deps_files = [
     "//heron/api/src/java:api-java",
     "//heron/common/src/java:basics-java",
     "//heron/simulator/src/java:simulator-java",
     "@com_googlecode_json_simple_json_simple//jar",
-    "//third_party/java:kryo",
+    "//third_party/java:kryo-neverlink",
 ]
 
+# Kryo is bundled here for integration test
 java_library(
     name='storm-compatibility-java',
     srcs = glob(["**/*.java"]),
-    deps = storm_deps_files, 
+    deps = storm_deps_files + ["//third_party/java:kryo"],
 )
 
 java_binary(
-    name = "storm-compatibility-unshaded",
+    name="storm-compatibility-unshaded",
     srcs = glob(["**/*.java"]),
     deps = storm_deps_files,
 )
 
-genrule(
+jarjar_binary(
     name = "heron-storm",
-    srcs = [":storm-compatibility-unshaded_deploy.jar"],
-    outs = ["heron-storm.jar"],
-    cmd  = "cp $< $@",
+    src = ":storm-compatibility-unshaded_deploy.jar",
+    shade = "shade.conf",
+    deps = ["@org_sonatype_plugins_jarjar_maven_plugin//jar"]
 )
diff --git a/heron/storm/src/java/shade.conf b/heron/storm/src/java/shade.conf
new file mode 100644
index 0000000..930aa16
--- /dev/null
+++ b/heron/storm/src/java/shade.conf
@@ -0,0 +1,3 @@
+rule com.google.** org.apache.storm.__shaded__.@0
+rule org.yaml.snakeyaml** org.apache.storm.__shaded__.@0
+rule org.json.simple** org.apache.storm.__shaded__.@0
diff --git a/heron/tools/cli/src/python/result.py b/heron/tools/cli/src/python/result.py
index e1643dc..6c6e669 100644
--- a/heron/tools/cli/src/python/result.py
+++ b/heron/tools/cli/src/python/result.py
@@ -64,16 +64,20 @@
         msg = msg[:-1]
       log_f(msg)
 
+  @staticmethod
+  def _do_print(f, msg):
+    if msg:
+      if msg[-1] == '\n':
+        msg = msg[:-1]
+      print >> f, msg
+
   def _log_context(self):
     # render context only after process exits
     assert self.status is not None
-    if self.status == Status.Ok or self.status == Status.DryRun:
+    if self.status in [Status.Ok, Status.DryRun]:
       self._do_log(Log.info, self.succ_context)
-    elif self.status == Status.HeronError:
+    elif self.status in [Status.HeronError, Status.InvocationError]:
       self._do_log(Log.error, self.err_context)
-    elif self.status == Status.InvocationError:
-      # invocation error has no context
-      pass
     else:
       raise RuntimeError(
           "Unknown status type of value %d. Expected value: %s", self.status.value, list(Status))
@@ -98,8 +102,8 @@
 class SimpleResult(Result):
   """Simple result: result that already and only
      contains status of the result"""
-  def __init__(self, status):
-    super(SimpleResult, self).__init__(status)
+  def __init__(self, *args):
+    super(SimpleResult, self).__init__(*args)
 
   def render(self):
     self._log_context()
@@ -127,7 +131,7 @@
     if retcode is not None and status_type(retcode) == Status.InvocationError:
       self._do_log(Log.error, stderr_line)
     else:
-      print >> sys.stderr, stderr_line,
+      self._do_print(sys.stderr, stderr_line)
 
   def renderProcessStdOut(self, stdout):
     """ render stdout of shelled-out process
@@ -147,10 +151,13 @@
       self._do_log(Log.error, stdout)
     # No need to prefix [INFO] here. We want to display dry-run response in a clean way
     elif self.status == Status.DryRun:
-      print >> sys.stdout, stdout,
+      self._do_print(sys.stdout, stdout)
+    elif self.status == Status.InvocationError:
+      self._do_print(sys.stdout, stdout)
     else:
       raise RuntimeError(
-          "Unknown status type of value %d. Expected value: %s", self.status.value, list(Status))
+          "Unknown status type of value %d. Expected value: %s" % \
+          (self.status.value, list(Status)))
 
   def render(self):
     while True:
diff --git a/heron/tools/cli/src/python/submit.py b/heron/tools/cli/src/python/submit.py
index 9d0ba76..9711109 100644
--- a/heron/tools/cli/src/python/submit.py
+++ b/heron/tools/cli/src/python/submit.py
@@ -193,8 +193,8 @@
   result.render(res)
 
   if not res.is_successful():
-    err_context = "Failed to create topology definition \
-      file when executing class '%s' of file '%s'" % (main_class, topology_file)
+    err_context = ("Failed to create topology definition " \
+      "file when executing class '%s' of file '%s'") % (main_class, topology_file)
     res.add_context(err_context)
     return res
 
@@ -234,11 +234,11 @@
       tmp_dir,
       java_defines)
 
-  res.render()
+  result.render(res)
 
   if not res.is_successful():
-    err_context = "Failed to create topology definition \
-      file when executing class '%s' of file '%s'" % (main_class, topology_file)
+    err_context = ("Failed to create topology definition " \
+      "file when executing class '%s' of file '%s'") % (main_class, topology_file)
     res.add_context(err_context)
     return res
 
@@ -256,11 +256,10 @@
   res = execute.heron_pex(
       topology_file, topology_class_name, tuple(unknown_args))
 
-  res.render()
-
+  result.render(res)
   if not res.is_successful():
-    err_context = "Failed to create topology definition \
-      file when executing class '%s' of file '%s'" % (topology_class_name, topology_file)
+    err_context = ("Failed to create topology definition " \
+      "file when executing class '%s' of file '%s'") % (topology_class_name, topology_file)
     res.add_context(err_context)
     return res
 
@@ -287,7 +286,7 @@
 
   # check to see if the topology file exists
   if not os.path.isfile(topology_file):
-    err_context = "Topology jar|tar|pex file '%s' does not exist" % topology_file
+    err_context = "Topology file '%s' does not exist" % topology_file
     return SimpleResult(Status.InvocationError, err_context)
 
   # check if it is a valid file type
diff --git a/heron/tools/common/src/python/access/heron_api.py b/heron/tools/common/src/python/access/heron_api.py
index 3b0ddeb..3654682 100644
--- a/heron/tools/common/src/python/access/heron_api.py
+++ b/heron/tools/common/src/python/access/heron_api.py
@@ -746,6 +746,7 @@
     '''
     components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
 
+    result = {}
     futures = []
     for comp in components:
       query = self.get_query(metric, comp, instance)
diff --git a/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java b/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
index f3a8320..dfaa034 100644
--- a/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
+++ b/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
@@ -81,7 +81,6 @@
 
   @Override
   public void execute(Tuple tuple) {
-    tuplesReceived++;
     String streamID = tuple.getSourceStreamId();
 
     LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);
@@ -105,6 +104,7 @@
             "Received a terminal, need to receive %s more", terminalsToReceive));
       }
     } else {
+      tuplesReceived++;
       currentTupleProcessing = tuple;
       delegateBolt.execute(tuple);
       // We ack only the tuples in user's logic
diff --git a/integration-test/src/python/test_runner/resources/test.json b/integration-test/src/python/test_runner/resources/test.json
index 7d7d273..021d91f 100644
--- a/integration-test/src/python/test_runner/resources/test.json
+++ b/integration-test/src/python/test_runner/resources/test.json
@@ -58,6 +58,11 @@
       "expectedResultRelativePath" : "bolt_double_emit_tuples/resources/BoltDoubleEmitTuples.json"
     },
     {
+      "topologyName" : "IntegrationTest_MultiSpoutsMultiTasks",
+      "classPath"    : "multi_spouts_multi_tasks.MultiSpoutsMultiTasks",
+      "expectedResultRelativePath" : "multi_spouts_multi_tasks/resources/MultiSpoutsMultiTasks.json"
+    },
+    {
       "topologyName" : "IntegrationTest_OneBoltMultiTasks",
       "classPath"    : "one_bolt_multi_tasks.OneBoltMultiTasks",
       "expectedResultRelativePath" : "one_bolt_multi_tasks/resources/OneBoltMultiTasks.json"
diff --git a/release/maven/heron-api.pom.template b/release/maven/heron-no-kryo.template.pom
similarity index 91%
rename from release/maven/heron-api.pom.template
rename to release/maven/heron-no-kryo.template.pom
index 08ad86c..31c0014 100644
--- a/release/maven/heron-api.pom.template
+++ b/release/maven/heron-no-kryo.template.pom
@@ -4,11 +4,11 @@
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.twitter.heron</groupId>
-  <artifactId>heron-api</artifactId>
+  <artifactId>ARTIFACT_ID</artifactId>
   <version>VERSION</version>  
   <packaging>jar</packaging>
-  <name>heron-api</name>
-  <description>Heron API</description>
+  <name>NAME</name>
+  <description>DESCRIPTION</description>
   <url>http://www.heronstreaming.io</url>
   <licenses>
     <license>
diff --git a/release/maven/heron-spi.pom.template b/release/maven/heron-spi.pom.template
deleted file mode 100644
index e747db6..0000000
--- a/release/maven/heron-spi.pom.template
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" 
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <groupId>com.twitter.heron</groupId>
-  <artifactId>heron-spi</artifactId>
-  <version>VERSION</version>  
-  <packaging>jar</packaging>
-  <name>heron-spi</name>
-  <description>Heron SPI</description>
-  <url>http://www.heronstreaming.io</url>
-  <licenses>
-    <license>
-      <name>The Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-    </license>
-  </licenses>
-  <developers>
-    <developer>
-      <name>Heron Users</name>
-      <email>heron-users@googlegroups.com</email>
-      <organization>Twitter</organization>
-      <organizationUrl>http://www.twitter.com</organizationUrl>
-    </developer>
-  </developers>
-  <scm>
-    <connection>scm:git:git@github.com:twitter/heron.git</connection>
-    <developerConnection>scm:git:git@github.com:twitter/heron.git</developerConnection>
-    <url>git@github.com:twitter/heron.git</url>
-  </scm>
-</project>
\ No newline at end of file
diff --git a/release/maven/heron-storm.pom.template b/release/maven/heron-with-kryo.template.pom
similarity index 77%
rename from release/maven/heron-storm.pom.template
rename to release/maven/heron-with-kryo.template.pom
index 9370907..8ca2a9c 100644
--- a/release/maven/heron-storm.pom.template
+++ b/release/maven/heron-with-kryo.template.pom
@@ -4,11 +4,11 @@
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.twitter.heron</groupId>
-  <artifactId>heron-storm</artifactId>
+  <artifactId>ARTIFACT_ID</artifactId>
   <version>VERSION</version>  
   <packaging>jar</packaging>
-  <name>heron-storm</name>
-  <description>Heron Storm</description>
+  <name>NAME</name>
+  <description>DESCRIPTION</description>
   <url>http://www.heronstreaming.io</url>
   <licenses>
     <license>
@@ -16,6 +16,14 @@
       <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
     </license>
   </licenses>
+  <dependencies>
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo</artifactId>
+      <version>3.0.3</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
   <developers>
     <developer>
       <name>Heron Users</name>
diff --git a/release/maven/maven-pom-version.sh b/release/maven/maven-pom-version.sh
index 0ff139e..5dec908 100755
--- a/release/maven/maven-pom-version.sh
+++ b/release/maven/maven-pom-version.sh
@@ -25,6 +25,24 @@
     exit 1
 fi
 
-cat ./maven/heron-api.pom.template | sed "s/VERSION/$1/g" >> ./heron-api-$1.pom
-cat ./maven/heron-storm.pom.template | sed "s/VERSION/$1/g" >> ./heron-storm-$1.pom
-cat ./maven/heron-spi.pom.template | sed "s/VERSION/$1/g" >> ./heron-spi-$1.pom
+cat ./maven/heron-no-kryo.template.pom | \
+    sed "s/VERSION/$1/g" | \
+    sed "s/ARTIFACT_ID/heron-api/g" | \
+    sed "s/NAME/heron-api/g" | \
+    sed "s/DESCRIPTION/Heron API/g" \
+    >> ./heron-api-$1.pom
+
+cat ./maven/heron-no-kryo.template.pom | \
+    sed "s/VERSION/$1/g" | \
+    sed "s/ARTIFACT_ID/heron-spi/g" | \
+    sed "s/NAME/heron-spi/g" | \
+    sed "s/DESCRIPTION/Heron SPI/g" \
+    >> ./heron-spi-$1.pom
+
+cat ./maven/heron-with-kryo.template.pom | \
+    sed "s/VERSION/$1/g" | \
+    sed "s/ARTIFACT_ID/heron-storm/g" | \
+    sed "s/NAME/heron-storm/g" | \
+    sed "s/DESCRIPTION/Heron Storm/g" \
+    >> ./heron-storm-$1.pom
+
diff --git a/scripts/resources/idea/copyright/heron.xml b/scripts/resources/idea/copyright/heron.xml
index 617bc67..413af1c 100644
--- a/scripts/resources/idea/copyright/heron.xml
+++ b/scripts/resources/idea/copyright/heron.xml
@@ -1,6 +1,6 @@
 <component name="CopyrightManager">
   <copyright>
     <option name="myName" value="heron" />
-    <option name="notice" value="// Copyright &amp;#36;today.year Twitter. All rights reserved.&#10;// &#10;// Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);&#10;// you may not use this file except in compliance with the License.&#10;// You may obtain a copy of the License at&#10;//&#10;// http://www.apache.org/licenses/LICENSE-2.0&#10;//&#10;// Unless required by applicable law or agreed to in writing, software&#10;// distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;// See the License for the specific language governing permissions and&#10;// limitations under the License" />
+    <option name="notice" value="// Copyright &amp;#36;today.year Twitter. All rights reserved.&#10;// &#10;// Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);&#10;// you may not use this file except in compliance with the License.&#10;// You may obtain a copy of the License at&#10;//&#10;// http://www.apache.org/licenses/LICENSE-2.0&#10;//&#10;// Unless required by applicable law or agreed to in writing, software&#10;// distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;// See the License for the specific language governing permissions and&#10;// limitations under the License." />
   </copyright>
 </component>
\ No newline at end of file
diff --git a/third_party/java/BUILD b/third_party/java/BUILD
index 3864fa0..22caa0d 100644
--- a/third_party/java/BUILD
+++ b/third_party/java/BUILD
@@ -99,6 +99,19 @@
     ],
 )
 
+# This version is needed for dependents that don't want
+# kryo to be included in the generated binary
+# (e.g. //heron/storm/src/java:heron-storm)
+java_library(
+    name = "kryo-neverlink",
+    srcs = [ "Empty.java" ],
+    exports = [ "@com_esotericsoftware_kryo//jar" ],
+    deps = [
+        "@com_esotericsoftware_kryo//jar",
+    ],
+    neverlink = 1,
+)
+
 java_library(
     name = "yarn",
     srcs = [ "Empty.java" ],
diff --git a/website/config.yaml b/website/config.yaml
index 56ba56d..ac3aa68 100755
--- a/website/config.yaml
+++ b/website/config.yaml
@@ -20,7 +20,7 @@
   author: Twitter, Inc.
   description: A realtime, distributed, fault-tolerant stream processing engine from Twitter
   versions:
-    heron: 0.14.5
+    heron: 0.14.7
     bazel: 0.3.1
   assets:
     favicon: