Merge branch 'objmagic/limit-msg-size' into 0.15.1.1-rc
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index 443bd31..e5a0e89 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -2,7 +2,7 @@
 
 package(default_visibility = ["//visibility:public"])
 
-load("/tools/rules/heron_deps", "heron_java_proto_files")
+load("/tools/rules/heron_deps", "heron_java_api_proto_files")
 load("/tools/rules/jarjar_rules", "jarjar_binary")
 load("/tools/rules/build_defs", "DOCLINT_HTML_AND_SYNTAX")
 load("/tools/rules/javadoc", "java_doc")
@@ -15,7 +15,7 @@
 )
 
 api_deps_files =  \
-    heron_java_proto_files() + [
+    heron_java_api_proto_files() + [
         ":classification",
         "//heron/common/src/java:basics-java",
     ]
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
index d784048..b06395b 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
@@ -205,6 +205,11 @@
   return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as<int>();
 }
 
+sp_int32 HeronInternalsConfigReader::GetHeronStreammgrHeronTupleSetMessageMaxBytes() {
+  return config_[HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES]
+      .as<int>();
+}
+
 sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() {
   return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as<int>();
 }
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h
index 37bed01..93c3ef5 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.h
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.h
@@ -156,6 +156,9 @@
   // The max number of messages in the memory pool for each message type
   sp_int32 GetHeronStreammgrMempoolMaxMessageNumber();
 
+  // The max byte size of HeronTupleSet message in stream manager
+  sp_int32 GetHeronStreammgrHeronTupleSetMessageMaxBytes();
+
   // Get the Nbucket value, for efficient acknowledgement
   sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets();
 
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
index addcadf..069a400 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
@@ -88,6 +88,8 @@
     "heron.streammgr.cache.drain.size.mb";
 const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER =
     "heron.streammgr.mempool.max.message.number";
+const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES =
+    "heron.streammgr.herontupleset.message.max.bytes";
 const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS =
     "heron.streammgr.xormgr.rotatingmap.nbuckets";
 const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_MAX_ATTEMPTS =
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h
index f711a9b..49422de 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.h
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.h
@@ -140,6 +140,9 @@
   // The max number of messages in the memory pool for each message type
   static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER;
 
+  // The max byte size of HeronTupleSet message in stream manager
+  static const sp_string HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES;
+
   // For efficient acknowledgement
   static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS;
 
diff --git a/heron/common/src/python/basics/__init__.py b/heron/common/src/python/basics/__init__.py
deleted file mode 100644
index c2b5d47..0000000
--- a/heron/common/src/python/basics/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-'''basic module'''
-__all__ = ['event_looper', 'gateway_looper']
-
-from .event_looper import EventLooper
-from .gateway_looper import GatewayLooper
diff --git a/heron/common/src/python/config/__init__.py b/heron/common/src/python/config/__init__.py
deleted file mode 100644
index 48daa21..0000000
--- a/heron/common/src/python/config/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-''' config module: various singleton configuration modules for PyHeron '''
-__all__ = ['system_config']
diff --git a/heron/common/tests/python/basics/BUILD b/heron/common/tests/python/basics/BUILD
deleted file mode 100644
index a6e17b8..0000000
--- a/heron/common/tests/python/basics/BUILD
+++ /dev/null
@@ -1,30 +0,0 @@
-package(default_visibility = ["//visibility:public"])
-load("/tools/rules/pex_rules", "pex_library", "pex_test")
-
-pex_test(
-    name = "gateway_looper_unittest",
-    srcs = ["gateway_looper_unittest.py"],
-    deps = [
-        "//heron/common/src/python:common-py"
-    ],
-    reqs = [
-        "py==1.4.27",
-        "pytest==2.6.4",
-        "unittest2==0.5.1",
-    ],
-    size = "small",
-)
-
-pex_test(
-    name = "event_looper_unittest",
-    srcs = ["event_looper_unittest.py"],
-    deps = [
-        "//heron/common/src/python:common-py"
-    ],
-    reqs = [
-        "py==1.4.27",
-        "pytest==2.6.4",
-        "unittest2==0.5.1",
-    ],
-    size = "small",
-)
diff --git a/heron/common/tests/python/basics/__init__.py b/heron/common/tests/python/basics/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/heron/common/tests/python/basics/__init__.py
+++ /dev/null
diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml
index a1b77b1..30f355d 100644
--- a/heron/config/src/yaml/conf/examples/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
index b592fbc..4bb9636 100644
--- a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The reconnect interval to other stream managers in secs for stream manager client
 heron.streammgr.client.reconnect.interval.sec: 1
 
diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml
index 56fb659..b96d535 100644
--- a/heron/config/src/yaml/conf/local/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/local/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml
index 1c79d34..62ceae3 100644
--- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml
index 7537a96..08241fb 100644
--- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml
+++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml
@@ -50,6 +50,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # For efficient acknowledgement
 heron.streammgr.xormgr.rotatingmap.nbuckets: 3
 
diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml
@@ -65,6 +65,9 @@
 # The max number of messages in the memory pool for each message type
 heron.streammgr.mempool.max.message.number: 512
 
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
 # The max reconnect attempts to other stream managers for stream manager client
 heron.streammgr.client.reconnect.max.attempts: 300
 
diff --git a/heron/instance/src/python/basics/base_instance.py b/heron/instance/src/python/basics/base_instance.py
index 9bd9eca..a8b42f0 100644
--- a/heron/instance/src/python/basics/base_instance.py
+++ b/heron/instance/src/python/basics/base_instance.py
@@ -21,15 +21,15 @@
 import heronpy.api.api_constants as api_constants
 from heronpy.api.state.stateful_component import StatefulComponent
 
-from heron.common.src.python.config import system_config
 from heron.common.src.python.utils.log import Log
 
 from heron.proto import tuple_pb2
 
 from heron.instance.src.python.utils.misc import SerializerHelper
 from heron.instance.src.python.utils.misc import OutgoingTupleHelper
+from heron.instance.src.python.utils import system_config
 
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
 import heron.common.src.python.pex_loader as pex_loader
 
 # pylint: disable=too-many-instance-attributes
diff --git a/heron/instance/src/python/basics/bolt_instance.py b/heron/instance/src/python/basics/bolt_instance.py
index 94a60a5..a92b095 100644
--- a/heron/instance/src/python/basics/bolt_instance.py
+++ b/heron/instance/src/python/basics/bolt_instance.py
@@ -27,7 +27,7 @@
 from heron.instance.src.python.utils.metrics import BoltMetrics
 from heron.instance.src.python.utils.tuple import TupleHelper, HeronTuple
 
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
 
 from .base_instance import BaseInstance
 
diff --git a/heron/instance/src/python/basics/spout_instance.py b/heron/instance/src/python/basics/spout_instance.py
index 5d4490b..fc5335b 100644
--- a/heron/instance/src/python/basics/spout_instance.py
+++ b/heron/instance/src/python/basics/spout_instance.py
@@ -28,7 +28,7 @@
 
 from heron.proto import topology_pb2, tuple_pb2, ckptmgr_pb2
 
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
 
 from .base_instance import BaseInstance
 
diff --git a/heron/instance/src/python/instance/st_heron_instance.py b/heron/instance/src/python/instance/st_heron_instance.py
index 1115a79..17d3db6 100644
--- a/heron/instance/src/python/instance/st_heron_instance.py
+++ b/heron/instance/src/python/instance/st_heron_instance.py
@@ -23,8 +23,6 @@
 import heronpy.api.api_constants as api_constants
 from heronpy.api.state.state import HashMapState
 
-from heron.common.src.python.basics import GatewayLooper
-from heron.common.src.python.config import system_config
 from heron.common.src.python.utils import log
 
 from heron.proto import physical_plan_pb2, tuple_pb2, ckptmgr_pb2, common_pb2
@@ -35,9 +33,10 @@
 from heron.instance.src.python.utils.metrics import GatewayMetrics, PyMetrics, MetricsCollector
 from heron.instance.src.python.network import MetricsManagerClient, SingleThreadStmgrClient
 from heron.instance.src.python.network import create_socket_options
+from heron.instance.src.python.network import GatewayLooper
 from heron.instance.src.python.basics import SpoutInstance, BoltInstance
-
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
 
 Log = log.Log
 AssignedInstance = collections.namedtuple('AssignedInstance', 'is_spout, protobuf, py_class')
diff --git a/heron/instance/src/python/network/__init__.py b/heron/instance/src/python/network/__init__.py
index 1d4ae80..50a59e6 100644
--- a/heron/instance/src/python/network/__init__.py
+++ b/heron/instance/src/python/network/__init__.py
@@ -1,6 +1,9 @@
 '''module for network component for python instance'''
-__all__ = ['metricsmgr_client', 'heron_client', 'st_stmgr_client', 'protocol', 'socket_options']
+__all__ = ['event_looper', 'gateway_looper', 'metricsmgr_client',
+           'heron_client', 'st_stmgr_client', 'protocol', 'socket_options']
 
+from .event_looper import EventLooper
+from .gateway_looper import GatewayLooper
 from .protocol import HeronProtocol, OutgoingPacket, IncomingPacket, REQID, StatusCode
 from .socket_options import SocketOptions, create_socket_options
 from .metricsmgr_client import MetricsManagerClient
diff --git a/heron/common/src/python/basics/event_looper.py b/heron/instance/src/python/network/event_looper.py
similarity index 100%
rename from heron/common/src/python/basics/event_looper.py
rename to heron/instance/src/python/network/event_looper.py
diff --git a/heron/common/src/python/basics/gateway_looper.py b/heron/instance/src/python/network/gateway_looper.py
similarity index 100%
rename from heron/common/src/python/basics/gateway_looper.py
rename to heron/instance/src/python/network/gateway_looper.py
diff --git a/heron/instance/src/python/network/heron_client.py b/heron/instance/src/python/network/heron_client.py
index 87ce3aa..2567eb1 100644
--- a/heron/instance/src/python/network/heron_client.py
+++ b/heron/instance/src/python/network/heron_client.py
@@ -20,7 +20,7 @@
 
 import time
 from heron.common.src.python.utils.log import Log
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
 from heron.instance.src.python.network import HeronProtocol, REQID, StatusCode, OutgoingPacket
 
 # pylint: disable=too-many-instance-attributes
@@ -30,7 +30,7 @@
   def __init__(self, looper, hostname, port, socket_map, socket_options):
     """Initializes HeronClient
 
-    :type looper: ``GatewayLooper`` (heron.common.src.python.basics)
+    :type looper: ``GatewayLooper`` (heron.instance.src.python.network)
     :param looper: looper object
     :type hostname: str
     :param hostname: endpoint hostname
diff --git a/heron/instance/src/python/network/metricsmgr_client.py b/heron/instance/src/python/network/metricsmgr_client.py
index 225b249..214209e 100644
--- a/heron/instance/src/python/network/metricsmgr_client.py
+++ b/heron/instance/src/python/network/metricsmgr_client.py
@@ -14,15 +14,15 @@
 '''metrics manager client'''
 import socket
 
-from heron.common.src.python.config import system_config
 from heron.common.src.python.utils.log import Log
 
 from heron.instance.src.python.network.heron_client import HeronClient
 from heron.instance.src.python.network import StatusCode
+from heron.instance.src.python.utils import system_config
 
 from heron.proto import metrics_pb2, common_pb2
 
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
 
 class MetricsManagerClient(HeronClient):
   """MetricsManagerClient, responsible for communicating with Metrics Manager"""
diff --git a/heron/instance/src/python/network/socket_options.py b/heron/instance/src/python/network/socket_options.py
index 18f1460..00bd09f 100644
--- a/heron/instance/src/python/network/socket_options.py
+++ b/heron/instance/src/python/network/socket_options.py
@@ -15,8 +15,8 @@
 
 from collections import namedtuple
 from heron.common.src.python.utils.log import Log
-import heron.common.src.python.system_constants as const
-from heron.common.src.python.config import system_config
+import heron.instance.src.python.utils.system_constants as const
+from heron.instance.src.python.utils import system_config
 
 SocketOptions = namedtuple('Options', 'nw_write_batch_size_bytes, nw_write_batch_time_ms, '
                                       'nw_read_batch_size_bytes, nw_read_batch_time_ms, '
diff --git a/heron/instance/src/python/network/st_stmgr_client.py b/heron/instance/src/python/network/st_stmgr_client.py
index 48935c5..d2a833a 100644
--- a/heron/instance/src/python/network/st_stmgr_client.py
+++ b/heron/instance/src/python/network/st_stmgr_client.py
@@ -15,15 +15,15 @@
 import sys
 import traceback
 
-from heron.common.src.python.config import system_config
 from heron.common.src.python.utils.log import Log
 
 from heron.instance.src.python.network.heron_client import HeronClient
 from heron.instance.src.python.network import StatusCode
+from heron.instance.src.python.utils import system_config
 
 from heron.proto import common_pb2, stmgr_pb2, tuple_pb2, ckptmgr_pb2
 
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
 
 # pylint: disable=too-many-arguments
 # pylint: disable=too-many-instance-attributes
diff --git a/heron/instance/src/python/utils/__init__.py b/heron/instance/src/python/utils/__init__.py
index 4654d2c..2cb4e0e 100644
--- a/heron/instance/src/python/utils/__init__.py
+++ b/heron/instance/src/python/utils/__init__.py
@@ -1,3 +1,3 @@
 '''common utility modules'''
 __all__ = ['metrics', 'misc', 'topology', 'config', 'tracker_access',
-           'tuple', 'proc', 'log']
+           'system_constants', 'system_config', 'tuple', 'proc', 'log']
diff --git a/heron/instance/src/python/utils/metrics/metrics_helper.py b/heron/instance/src/python/utils/metrics/metrics_helper.py
index fe256f4..1bbb44e 100644
--- a/heron/instance/src/python/utils/metrics/metrics_helper.py
+++ b/heron/instance/src/python/utils/metrics/metrics_helper.py
@@ -14,8 +14,8 @@
 '''metrics_helper: helper classes for managing common metrics'''
 
 from heron.common.src.python.utils.log import Log
-from heron.common.src.python.config import system_config
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
 
 from heron.proto import metrics_pb2
 
diff --git a/heron/instance/src/python/utils/metrics/py_metrics.py b/heron/instance/src/python/utils/metrics/py_metrics.py
index c6a520c..3d52bb0 100644
--- a/heron/instance/src/python/utils/metrics/py_metrics.py
+++ b/heron/instance/src/python/utils/metrics/py_metrics.py
@@ -17,8 +17,8 @@
 import traceback
 from heronpy.api.metrics import AssignableMetrics, MultiAssignableMetrics
 from .metrics_helper import BaseMetricsHelper
-import heron.common.src.python.system_constants as constants
-from heron.common.src.python.config import system_config
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
 from heron.common.src.python.utils.log import Log
 
 # pylint: disable=too-many-instance-attributes
diff --git a/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py b/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py
index cc0912c..80c2776 100644
--- a/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py
+++ b/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py
@@ -14,11 +14,11 @@
 '''outgoing_tuple_helper.py: module to provide a helper class for preparing and pushing tuples'''
 import sys
 
-from heron.common.src.python.config import system_config
 from heron.common.src.python.utils.log import Log
 from heron.proto import tuple_pb2, topology_pb2, ckptmgr_pb2
 
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
 
 # pylint: disable=too-many-instance-attributes
 # pylint: disable=no-value-for-parameter
diff --git a/heron/common/src/python/config/system_config.py b/heron/instance/src/python/utils/system_config.py
similarity index 100%
rename from heron/common/src/python/config/system_config.py
rename to heron/instance/src/python/utils/system_config.py
diff --git a/heron/common/src/python/system_constants.py b/heron/instance/src/python/utils/system_constants.py
similarity index 100%
rename from heron/common/src/python/system_constants.py
rename to heron/instance/src/python/utils/system_constants.py
diff --git a/heron/instance/src/python/utils/topology/topology_context_impl.py b/heron/instance/src/python/utils/topology/topology_context_impl.py
index 4fad3e5..29d1406 100644
--- a/heron/instance/src/python/utils/topology/topology_context_impl.py
+++ b/heron/instance/src/python/utils/topology/topology_context_impl.py
@@ -23,7 +23,7 @@
 import heronpy.api.api_constants as api_constants
 from heron.instance.src.python.utils.metrics import MetricsCollector
 
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
 import heron.common.src.python.pex_loader as pex_loader
 
 class TopologyContextImpl(TopologyContext):
diff --git a/heron/instance/tests/python/network/BUILD b/heron/instance/tests/python/network/BUILD
index cde4b06..dc685b6 100644
--- a/heron/instance/tests/python/network/BUILD
+++ b/heron/instance/tests/python/network/BUILD
@@ -101,3 +101,31 @@
     ],
     size = "small",
 )
+
+pex_test(
+    name = "gateway_looper_unittest",
+    srcs = ["gateway_looper_unittest.py"],
+    deps = [
+        "//heron/instance/src/python:instance-py",
+    ],
+    reqs = [
+        "py==1.4.27",
+        "pytest==2.6.4",
+        "unittest2==0.5.1",
+    ],
+    size = "small",
+)
+
+pex_test(
+    name = "event_looper_unittest",
+    srcs = ["event_looper_unittest.py"],
+    deps = [
+        "//heron/instance/src/python:instance-py",
+    ],
+    reqs = [
+        "py==1.4.27",
+        "pytest==2.6.4",
+        "unittest2==0.5.1",
+    ],
+    size = "small",
+)
diff --git a/heron/common/tests/python/basics/event_looper_unittest.py b/heron/instance/tests/python/network/event_looper_unittest.py
similarity index 97%
rename from heron/common/tests/python/basics/event_looper_unittest.py
rename to heron/instance/tests/python/network/event_looper_unittest.py
index acb9fca..da31f9c 100644
--- a/heron/common/tests/python/basics/event_looper_unittest.py
+++ b/heron/instance/tests/python/network/event_looper_unittest.py
@@ -15,7 +15,7 @@
 import time
 import unittest2 as unittest
 
-from heron.common.src.python.basics import EventLooper
+from heron.instance.src.python.network import EventLooper
 
 # pylint: disable=missing-docstring
 # pylint: disable=W0212
diff --git a/heron/common/tests/python/basics/gateway_looper_unittest.py b/heron/instance/tests/python/network/gateway_looper_unittest.py
similarity index 95%
rename from heron/common/tests/python/basics/gateway_looper_unittest.py
rename to heron/instance/tests/python/network/gateway_looper_unittest.py
index ae1ba0d..79f4e5b 100644
--- a/heron/common/tests/python/basics/gateway_looper_unittest.py
+++ b/heron/instance/tests/python/network/gateway_looper_unittest.py
@@ -16,7 +16,7 @@
 import time
 import unittest2 as unittest
 
-from heron.common.src.python.basics.gateway_looper import GatewayLooper
+from heron.instance.src.python.network.gateway_looper import GatewayLooper
 
 # pylint: disable=missing-docstring
 class GatewayLooperTest(unittest.TestCase):
diff --git a/heron/instance/tests/python/network/mock_generator.py b/heron/instance/tests/python/network/mock_generator.py
index b49411d..8d74965 100644
--- a/heron/instance/tests/python/network/mock_generator.py
+++ b/heron/instance/tests/python/network/mock_generator.py
@@ -13,8 +13,8 @@
 # limitations under the License.
 '''mock_generator for instance/network'''
 # pylint : disable=missing-docstring
-from heron.common.src.python.basics import EventLooper
-import heron.common.src.python.system_constants as constants
+from heron.instance.src.python.network import EventLooper
+import heron.instance.src.python.utils.system_constants as constants
 from heron.instance.src.python.utils.misc import HeronCommunicator
 from heron.instance.src.python.network import SingleThreadStmgrClient, MetricsManagerClient
 from heron.instance.src.python.network import SocketOptions
@@ -29,7 +29,7 @@
 
   def __init__(self):
     socket_options = SocketOptions(32768, 16, 32768, 16, 1024000, 1024000)
-    with patch("heron.common.src.python.config.system_config.get_sys_config",
+    with patch("heron.instance.src.python.utils.system_config.get_sys_config",
                side_effect=lambda: {constants.INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC: 10}):
       SingleThreadStmgrClient.__init__(self, EventLooper(), None, self.HOST, self.PORT,
                                        "topology_name", "topology_id",
@@ -50,7 +50,7 @@
   PORT = 9000
   def __init__(self):
     socket_options = SocketOptions(32768, 16, 32768, 16, 1024000, 1024000)
-    with patch("heron.common.src.python.config.system_config.get_sys_config",
+    with patch("heron.instance.src.python.utils.system_config.get_sys_config",
                side_effect=lambda: {constants.INSTANCE_RECONNECT_METRICSMGR_INTERVAL_SEC: 10,
                                     constants.INSTANCE_METRICS_SYSTEM_SAMPLE_INTERVAL_SEC: 10}):
       stream = HeronCommunicator(producer_cb=None, consumer_cb=None)
diff --git a/heron/instance/tests/python/utils/mock_generator.py b/heron/instance/tests/python/utils/mock_generator.py
index 46bd5ec..8aac3cf 100644
--- a/heron/instance/tests/python/utils/mock_generator.py
+++ b/heron/instance/tests/python/utils/mock_generator.py
@@ -27,7 +27,7 @@
                                                 HeronCommunicator)
 from heron.proto import tuple_pb2
 
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
 import heron.instance.tests.python.mock_protobuf as mock_protobuf
 
 prim_list = [1000, -234, 0.00023, "string",
@@ -114,7 +114,7 @@
 
     if mode == MockOutgoingTupleHelper.SAMPLE_SUCCESS:
       pplan_helper, out_stream = self._prepare_sample_success()
-      with patch("heron.common.src.python.config.system_config.get_sys_config",
+      with patch("heron.instance.src.python.utils.system_config.get_sys_config",
                  side_effect=lambda: sample_sys_config):
         super(MockOutgoingTupleHelper, self).__init__(pplan_helper, out_stream)
 
diff --git a/heron/instance/tests/python/utils/py_metrics_unittest.py b/heron/instance/tests/python/utils/py_metrics_unittest.py
index 77d691b..e1bd6d3 100644
--- a/heron/instance/tests/python/utils/py_metrics_unittest.py
+++ b/heron/instance/tests/python/utils/py_metrics_unittest.py
@@ -19,7 +19,7 @@
 import threading
 
 from heron.instance.src.python.utils.metrics.py_metrics import PyMetrics
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
 import heron.instance.tests.python.utils.mock_generator as mock_generator
 
 Mem = namedtuple('Mem', ['rss', 'vms'])
@@ -29,7 +29,7 @@
 class PyMetricsTest(unittest.TestCase):
   def setUp(self):
     metrics_collector = mock_generator.MockMetricsCollector()
-    with patch("heron.common.src.python.config.system_config.get_sys_config",
+    with patch("heron.instance.src.python.utils.system_config.get_sys_config",
                side_effect=lambda: {constants.HERON_METRICS_EXPORT_INTERVAL_SEC: 10}):
           self.py_metrics = PyMetrics(metrics_collector)
     self.py_metrics.process = Mock()
diff --git a/heron/proto/BUILD b/heron/proto/BUILD
index d814fe9..21b38fe 100644
--- a/heron/proto/BUILD
+++ b/heron/proto/BUILD
@@ -177,40 +177,6 @@
     ],
 )
 
-# This is for binaries that don't want protobuf 
-# dependency to be included in the generated jar
-java_library(
-    name = "proto-java-nl",
-    srcs = [
-        "Empty.java",
-    ],
-    deps = [
-        ":proto_common_java",
-        ":proto_execution_state_java",
-        ":proto_stats_java",
-        ":proto_topology_java",
-        ":proto_scheduler_java",
-        ":proto_packing_plan_java",
-        ":proto_physical_plan_java",
-        ":proto_metrics_java",
-        ":proto_tmaster_java",
-        ":proto_tuple_java",
-        ":proto_stmgr_java",
-        ":proto_ckptmgr_java",
-        ":proto_networktests_java",
-        "//third_party/java:protobuf-java-neverlink",
-    ],
-)
-
-java_binary(
-    name = "proto-bin-java",
-    srcs = ["Empty.java"],
-    create_executable = 0,
-    deps = [
-        ":proto-java-nl",
-    ],
-)
-
 pex_library(
     name = "proto-py",
     deps = [
diff --git a/heron/proto/pplan.proto.oldx b/heron/proto/pplan.proto.oldx
deleted file mode 100644
index 3a270bf..0000000
--- a/heron/proto/pplan.proto.oldx
+++ /dev/null
@@ -1,91 +0,0 @@
-package heron.proto.system;
-
-option java_package = "com.twitter.heron.proto.system";
-option java_outer_classname = "PhysicalPlans";
-
-enum CompType {
-  SPOUT = 1;
-  BOLT = 2;
-}
-
-enum GroupType {
-  RANDOM = 1;
-  SHUFFLE = 2;
-  FIELDS = 3;
-  REPLICATE = 4; 
-  NONE = 5;
-  SELF = 6;
-}
-
-message Schema {
-  repeated string keys = 1;
-}
-
-message Manager {
-  required string id = 1;
-  required string name = 2;
-  required int32  dport = 3;
-  required int32  cport = 4;
-  required string epoint = 5;
-}
-
-message Group {
-  required string id = 1;
-  required string host_id = 2;
-}
-
-message Stream {
-  required string       id = 1;
-  required GroupType    gtype = 2; 
-}
-
-message Grouping {
-  required Stream       stream = 1;
-  repeated Group        groups = 2;  
-}
-
-message Input {
-  required Stream       stream = 1;
-  required string       group_id = 2;
-}
-
-message Output {
-  required string       stream_id = 1;
-  required Schema       schema = 2;
-}
-
-message Worker {
-  required string id = 1;
-  required string host_id = 2;
-  required CompType comp_type = 3;
-  required string instance_id = 4;
-}
-  
-message Component {
-  required string id = 1;
-  required string class_name = 2;
-}
-
-message SpoutInstance {
-  required string id = 1;
-  required Component comp_id = 2;
-  repeated Output ostreams = 3;
-  repeated string params = 4;
-}
-
-message BoltInstance {
-  required string id = 1;
-  required Component comp_id = 2;
-  repeated Input istreams = 3;
-  repeated Output ostreams = 4;
-  repeated string params = 5;
-}
-
-message PhysicalPlan {
-  repeated Manager mgrs = 1;
-  repeated Stream streams = 2;
-  repeated Grouping groupings = 3;
-  repeated Worker workers = 4;
-  repeated SpoutInstance spouts = 5;
-  repeated BoltInstance bolts = 6;
-}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 027fee3..1923e6b 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -120,7 +120,9 @@
   metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT,
                                            back_pressure_metric_initiated_);
   spouts_under_back_pressure_ = false;
-
+  max_herontupleset_size_in_bytes =
+    config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes();
+  space_check_counter = 0;
   // Update queue related metrics every 10 seconds
   CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
     this->UpdateQueueMetrics(status);
@@ -431,7 +433,20 @@
         ->incr_by(_message->control().fails_size());
   }
   stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message);
-  __global_protobuf_pool_release__(_message);
+  space_check_counter = (space_check_counter + 1) % 1024;
+  if (space_check_counter == 0) {
+    auto message_size = _message->SpaceUsed();
+    if (message_size >= max_herontupleset_size_in_bytes) {
+      LOG(WARNING) << "HeronTupleSet message has size " << message_size <<
+        " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." <<
+        " Release to memory allocator rather than memory pool.";
+      delete _message;
+    } else {
+      __global_protobuf_pool_release__(_message);
+    }
+  } else {
+    __global_protobuf_pool_release__(_message);
+  }
 }
 
 void StMgrServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 0faa08f..0ad4707 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -201,6 +201,8 @@
   heron::common::TimeSpentMetric* back_pressure_metric_initiated_;
 
   bool spouts_under_back_pressure_;
+  sp_uint32 max_herontupleset_size_in_bytes;
+  sp_uint32 space_check_counter;
 
   // Stateful processing related member variables
   NeighbourCalculator* neighbour_calculator_;
diff --git a/heron/tools/tracker/src/python/topology_helpers.py b/heron/tools/tracker/src/python/topology_helpers.py
deleted file mode 100644
index d30c5d4..0000000
--- a/heron/tools/tracker/src/python/topology_helpers.py
+++ /dev/null
@@ -1,330 +0,0 @@
-# Copyright 2016 Twitter. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-This module includes a bunch of library functions
-relevant for dealing with the topology structure
-"""
-
-import sets
-
-from heronpy.api import api_constants
-from heron.common.src.python import system_constants
-from heron.proto import topology_pb2
-
-def get_topology_config(topology, key):
-  """
-  Helper function to get the value of key in topology config.
-  Return None if the key is not present.
-  """
-  for kv in topology.topology_config.kvs:
-    if kv.key == key:
-      return kv.value
-  return None
-
-def get_component_parallelism(topology):
-  """
-  The argument is the proto object for topology.
-  Returns a map of components to their parallelism.
-  """
-  cmap = {}
-  components = list(topology.spouts) + list(topology.bolts)
-  for component in components:
-    component_name = component.comp.name
-    for kv in component.comp.config.kvs:
-      if kv.key == api_constants.TOPOLOGY_COMPONENT_PARALLELISM:
-        cmap[component_name] = int(kv.value)
-  return cmap
-
-def get_nstmgrs(topology):
-  """
-  The argument is the proto object for topology.
-  Returns the number of stream managers for the topology.
-  This is equal to the number of containers.
-  If not present, return 1 as default.
-  """
-  return int(get_topology_config(topology, api_constants.TOPOLOGY_STMGRS) or 1)
-
-def get_instance_opts(topology):
-  """
-  The argument is the proto object for topology.
-  Returns the topology worker child options.
-  If not present, return empty string.
-  """
-  return str(get_topology_config(topology, api_constants.TOPOLOGY_WORKER_CHILDOPTS) or "")
-
-def get_total_instances(topology):
-  """
-  The argument is the proto object for topology.
-  Returns the total number of instances based on all parallelisms.
-  """
-  cmap = get_component_parallelism(topology)
-  return sum(cmap.values())
-
-def get_additional_classpath(topology):
-  """
-  The argument is the proto object for topology.
-  Returns an empty string if no additional classpath specified
-  """
-  additional_classpath = str(get_topology_config(
-      topology, api_constants.TOPOLOGY_ADDITIONAL_CLASSPATH) or "")
-  return additional_classpath
-
-def get_cpus_per_container(topology):
-  """
-  The argument is the proto object for topology.
-  Calculate and return the CPUs per container.
-  It can be calculated in two ways:
-      1. It is passed in config.
-      2. Allocate 1 CPU per instance in a container, and 1 extra for stmrs.
-  """
-  cpus = get_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_CPU_REQUESTED)
-  if not cpus:
-    ninstances = get_total_instances(topology)
-    nstmgrs = get_nstmgrs(topology)
-    cpus = float(ninstances) / nstmgrs + 1 # plus one for stmgr
-  return float(cpus)
-
-def get_disk_per_container(topology):
-  """
-  The argument is the proto object for topology.
-  Calculate and return the disk per container.
-  It can be calculated in two ways:
-      1. It is passed in config.
-      2. Allocate 1 GB per instance in a container, and 12 GB extra for the rest.
-  """
-  disk = get_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_DISK_REQUESTED)
-  if not disk:
-    ninstances = get_total_instances(topology)
-    nstmgrs = get_nstmgrs(topology)
-    # Round to the ceiling
-    maxInstanceInOneContainer = (ninstances + nstmgrs - 1) / nstmgrs
-
-    disk = maxInstanceInOneContainer * system_constants.GB + \
-           system_constants.DEFAULT_DISK_PADDING_PER_CONTAINER
-  return disk
-
-def get_ram_per_container(topology):
-  """
-  The argument is the proto object for topology.
-  Calculate and return RAM per container for the topology based
-  container rammap. Since rammap takes into account all the
-  configs and cases, we just add some ram for stmgr and return it.
-  """
-  component_distribution = get_component_distribution(topology)
-  rammap = get_component_rammap(topology)
-  maxsize = 0
-  for (_, container_items) in component_distribution.items():
-    ramsize = 0
-    for (component_name, _, _) in container_items:
-      ramsize += int(rammap[component_name])
-    if ramsize > maxsize:
-      maxsize = ramsize
-  return maxsize + system_constants.RAM_FOR_STMGR
-
-def get_component_rammap(topology):
-  """
-  The argument is the proto object for topology.
-  It is calculated based on the following priority:
-      1. Form the user specified component rammap. If the rammap
-         is complete, return it.
-      2. If only some component rams are specified, assign them
-         the default RAM, and return it.
-      3. If no component RAM is specified, take the container ram requested
-         and divide it equally among the max possible instances.
-      4. If container RAM was not requested, assign the default RAM
-         to all components.
-  Returns a map from component name to ram in bytes
-  """
-  component_distribution = get_component_distribution(topology)
-  components = list(topology.spouts) + list(topology.bolts)
-
-  # first check if user has specified the rammap
-  rammap = get_user_rammap(topology) or {}
-
-  # Some or all of them are there, so assign default to those components
-  # that are not specified.
-  if len(rammap.keys()) > 0:
-    for component in components:
-      component_name = component.comp.name
-      if component_name not in rammap:
-        rammap[component_name] = system_constants.DEFAULT_RAM_FOR_INSTANCE
-    return rammap
-
-  max_instances_in_a_container = max(map(lambda x: len(x[1]), component_distribution.items()))
-
-  # User has not specified any kind of rammap
-  # We just find the ram needed for a container and divide
-  # memory equally
-  requested_ram_per_container = get_container_ram_requested(topology)
-  if requested_ram_per_container != None:
-    ram_for_jvms = requested_ram_per_container - system_constants.RAM_FOR_STMGR
-    ram_per_instance = int(ram_for_jvms / max_instances_in_a_container)
-    rammap = {}
-    for component in components:
-      component_name = component.comp.name
-      rammap[component_name] = ram_per_instance
-    return rammap
-
-  # Nothing was specified.
-  # The default is to allocate one 1gb per instance of all components
-  ram_per_instance = int(system_constants.DEFAULT_RAM_FOR_INSTANCE)
-  rammap = {}
-  for component in components:
-    component_name = component.comp.name
-    rammap[component_name] = ram_per_instance
-  return rammap
-
-def strip_java_objects(topology):
-  """
-  The argument is the proto object for topology.
-  The java objects are huge objects and topology
-  is stripped off these objects so that it can be stored
-  in zookeeper.
-  """
-  stripped_topology = topology_pb2.Topology()
-  stripped_topology.CopyFrom(topology)
-  components = list(stripped_topology.spouts) + list(stripped_topology.bolts)
-  for component in components:
-    if component.comp.HasField("serialized_object"):
-      component.comp.ClearField("serialized_object")
-
-  return stripped_topology
-
-def get_component_distribution(topology):
-  """
-  The argument is the proto object for topology.
-  Distribute components and their instances evenly across containers in a round robin fashion
-  Return value is a map from container id to a list.
-  Each element of a list is a tuple (component_name, global_task_id, component_index)
-  This is essentially the physical plan of the topology.
-  """
-  containers = {}
-  nstmgrs = get_nstmgrs(topology)
-  for i in range(1, nstmgrs + 1):
-    containers[i] = []
-  index = 1
-  global_task_id = 1
-  for (component_name, ninstances) in get_component_parallelism(topology).items():
-    component_index = 0
-    for i in range(ninstances):
-      containers[index].append((str(component_name), str(global_task_id), str(component_index)))
-      component_index = component_index + 1
-      global_task_id = global_task_id + 1
-      index = index + 1
-      if index == nstmgrs + 1:
-        index = 1
-  return containers
-
-def get_user_rammap(topology):
-  """
-  The argument is the proto object for topology.
-  Returns a dict of component to ram as specified by user.
-  Expected user input is "comp1:42,comp2:24,..."
-  where 42 and 24 represents the ram requested in bytes.
-  Returns None if nothing is specified.
-  """
-  rammap = get_topology_config(topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP)
-  if rammap:
-    rmap = {}
-    for component_ram in rammap.split(","):
-      component, ram = component_ram.split(":")
-      rmap[component] = int(ram)
-    return rmap
-  return None
-
-def get_container_ram_requested(topology):
-  """
-  The argument is the proto object for topology.
-  Returns the container ram as requested by the user.
-  Returns None if not specified.
-  """
-  ram = get_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED)
-  return int(ram) if ram else None
-
-def get_component_jvmopts(topology):
-  """
-  The argument is the proto object for topology.
-  Returns the jvm options if specified by user.
-  """
-  return str(get_topology_config(topology, api_constants.TOPOLOGY_COMPONENT_JVMOPTS) or "")
-
-def get_topology_state_string(topology):
-  """
-  The argument is the proto object for topology.
-  Returns the topology state as one of:
-      1. RUNNING
-      2. PAUSED
-      3. KILLED
-  """
-  return topology_pb2.TopologyState.Name(topology.state)
-
-# pylint: disable=too-many-return-statements, too-many-branches
-def sane(topology):
-  """ First check if topology name is ok """
-  if topology.name == "":
-    print "Topology name cannot be empty"
-    return False
-  if '.' in topology.name or '/' in topology.name:
-    print "Topology name cannot contain . or /"
-    return False
-
-  component_names = set()
-  for spout in topology.spouts:
-    component_name = spout.comp.name
-    if component_name in component_names:
-      print component_name + " is duplicated twice"
-      return False
-    component_names.add(component_name)
-
-  for bolt in topology.bolts:
-    component_name = bolt.comp.name
-    if component_name in component_names:
-      print component_name + " is duplicated twice"
-      return False
-    component_names.add(component_name)
-
-  ninstances = get_total_instances(topology)
-  nstmgrs = get_nstmgrs(topology)
-  if nstmgrs > ninstances:
-    print "Number of containers are greater than number of instances"
-    return False
-
-  for kv in topology.topology_config.kvs:
-    if kv.key == api_constants.TOPOLOGY_COMPONENT_RAMMAP:
-      rammap = str(kv.value).split(',')
-      for component_ram in rammap:
-        component_and_ram = component_ram.split(':')
-        if len(component_and_ram) != 2:
-          print "TOPOLOGY_COMPONENT_RAMMAP is set incorrectly"
-          return False
-        if not component_and_ram[0] in component_names:
-          print "TOPOLOGY_COMPONENT_RAMMAP is set incorrectly"
-          return False
-
-  # Now check if bolts are reading streams that some component emits
-  all_outputs = sets.Set()
-  for spout in topology.spouts:
-    for outputstream in spout.outputs:
-      all_outputs.add((outputstream.stream.id, outputstream.stream.component_name))
-  for bolt in topology.bolts:
-    for outputstream in bolt.outputs:
-      all_outputs.add((outputstream.stream.id, outputstream.stream.component_name))
-  for bolt in topology.bolts:
-    for inputstream in bolt.inputs:
-      if (inputstream.stream.id, inputstream.stream.component_name) not in all_outputs:
-        print "Bolt " + bolt.comp.name + " has an input stream that no one emits"
-        return False
-
-  return True
diff --git a/heron/tools/tracker/tests/python/BUILD b/heron/tools/tracker/tests/python/BUILD
index 82e3b19..0637663 100644
--- a/heron/tools/tracker/tests/python/BUILD
+++ b/heron/tools/tracker/tests/python/BUILD
@@ -11,22 +11,6 @@
 )
 
 pex_test(
-    name = "topology_helpers_unittest",
-    srcs = ["topology_helpers_unittest.py"],
-    deps = [
-        "//heron/tools/tracker/src/python:tracker-py",
-        "//heronpy/api:heron-python-py",
-    ],
-    reqs = [
-        "mock==1.0.1",
-        "py==1.4.27",
-        "pytest==2.6.4",
-        "unittest2==0.5.1",
-    ],
-    size = "small",
-)
-
-pex_test(
     name = "topology_unittest",
     srcs = ["topology_unittest.py"],
     deps = [
diff --git a/heron/tools/tracker/tests/python/topology_helpers_unittest.py b/heron/tools/tracker/tests/python/topology_helpers_unittest.py
deleted file mode 100644
index 314f493..0000000
--- a/heron/tools/tracker/tests/python/topology_helpers_unittest.py
+++ /dev/null
@@ -1,233 +0,0 @@
-''' topology_helpers_unittest.py '''
-# pylint: disable=missing-docstring
-import unittest2 as unittest
-
-from heronpy.api import api_constants
-from heron.common.src.python import system_constants
-from heron.tools.tracker.src.python import topology_helpers
-from mock_proto import MockProto
-
-class TopologyHelpersTest(unittest.TestCase):
-  def setUp(self):
-    self.mock_proto = MockProto()
-
-  def test_get_component_parallelism(self):
-    topology = self.mock_proto.create_mock_medium_topology(1, 2, 3, 4)
-
-    cmap = topology_helpers.get_component_parallelism(topology)
-    self.assertEqual(1, cmap["mock_spout1"])
-    self.assertEqual(2, cmap["mock_bolt1"])
-    self.assertEqual(3, cmap["mock_bolt2"])
-    self.assertEqual(4, cmap["mock_bolt3"])
-
-  def test_get_disk_per_container(self):
-    # We would have 9 instances
-    topology = self.mock_proto.create_mock_medium_topology(1, 2, 3, 4)
-
-    # First try with 1 container, so the disk request should be:
-    # 10 * GB + Padding_Disk (12GB) = 22GB
-    default_disk = topology_helpers.get_disk_per_container(topology)
-    self.assertEqual(22 * system_constants.GB, default_disk)
-
-    # Then try with 4 container, so the disk request should be:
-    # 10/4 = 2.5 -> 3 (round to ceiling) + 12 = 15GB
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    self.assertEqual(15 * system_constants.GB, topology_helpers.get_disk_per_container(topology))
-
-    # Then let's set the disk_per_container explicitly
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_CONTAINER_DISK_REQUESTED, 950109)
-    # The add_topology_config will convert config into string
-    self.assertEqual(str(950109), topology_helpers.get_disk_per_container(topology))
-
-  def test_get_total_instances(self):
-    topology = self.mock_proto.create_mock_medium_topology(3, 4, 5, 6)
-
-    num_instances = topology_helpers.get_total_instances(topology)
-    self.assertEqual(18, num_instances)
-
-  def test_sane(self):
-    # Make wrong topology names
-    topology = self.mock_proto.create_mock_simple_topology()
-    topology.name = ""
-    self.assertFalse(topology_helpers.sane(topology))
-
-    topology = self.mock_proto.create_mock_simple_topology()
-    topology.name = "test.with.a.dot"
-    self.assertFalse(topology_helpers.sane(topology))
-
-    topology = self.mock_proto.create_mock_simple_topology()
-    topology.name = "test/with/a/slash"
-    self.assertFalse(topology_helpers.sane(topology))
-
-    # Add another spout with the same name
-    topology = self.mock_proto.create_mock_simple_topology()
-    topology.spouts.extend([self.mock_proto.create_mock_spout("mock_spout", [], 1)])
-    self.assertFalse(topology_helpers.sane(topology))
-
-    # Add another bolt with the same name
-    topology = self.mock_proto.create_mock_simple_topology()
-    topology.bolts.extend([self.mock_proto.create_mock_bolt("mock_bolt", [], [], 1)])
-    self.assertFalse(topology_helpers.sane(topology))
-
-    # If num containers are greater than num instances
-    topology = self.mock_proto.create_mock_simple_topology(1, 1)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    self.assertFalse(topology_helpers.sane(topology))
-
-    # If rammap is partial with less componenets
-    topology = self.mock_proto.create_mock_simple_topology()
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:1")
-    self.assertTrue(topology_helpers.sane(topology))
-
-    # If rammap is not well formatted
-    topology = self.mock_proto.create_mock_simple_topology()
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:1:2,mock_bolt:2:3")
-    self.assertFalse(topology_helpers.sane(topology))
-
-    # If rammap has wrong component name
-    topology = self.mock_proto.create_mock_simple_topology()
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "wrong_mock_spout:1,mock_bolt:2")
-    self.assertFalse(topology_helpers.sane(topology))
-
-    # If everything is right
-    topology = self.mock_proto.create_mock_simple_topology()
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:1,mock_bolt:2")
-    self.assertTrue(topology_helpers.sane(topology))
-
-  def test_num_cpus_per_container(self):
-    topology = self.mock_proto.create_mock_simple_topology(2, 2)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    self.assertEqual(2, topology_helpers.get_cpus_per_container(topology))
-
-    topology = self.mock_proto.create_mock_simple_topology(2, 2)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_CPU_REQUESTED, 42)
-    self.assertEqual(42, topology_helpers.get_cpus_per_container(topology))
-
-  def test_get_user_rammap(self):
-    topology = self.mock_proto.create_mock_simple_topology()
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2,mock_bolt:3")
-    self.assertEqual({"mock_spout":2, "mock_bolt":3}, topology_helpers.get_user_rammap(topology))
-
-  def test_get_component_distribution(self):
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    component_distribution = topology_helpers.get_component_distribution(topology)
-
-    expected_component_distribution = {
-        1: [
-            ("mock_bolt", "1", "0"),
-            ("mock_bolt", "5", "4"),
-            ("mock_spout", "9", "0")
-        ],
-        2: [
-            ("mock_bolt", "2", "1"),
-            ("mock_bolt", "6", "5"),
-            ("mock_spout", "10", "1")
-        ],
-        3: [
-            ("mock_bolt", "3", "2"),
-            ("mock_bolt", "7", "6"),
-            ("mock_spout", "11", "2")
-        ],
-        4: [
-            ("mock_bolt", "4", "3"),
-            ("mock_bolt", "8", "7"),
-            ("mock_spout", "12", "3")
-        ]
-    }
-    self.assertEqual(expected_component_distribution, component_distribution)
-
-  def test_get_component_rammap(self):
-    # Mock a few methods
-    # This is not a good way since this shows the internals
-    # of the method. These methods need to be changed.
-    # For example, ram_per_contaner could be taken as an argument.
-    original_ram_for_stmgr = system_constants.RAM_FOR_STMGR
-    system_constants.RAM_FOR_STMGR = 2
-
-    # When rammap is specified, it should be used.
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2,mock_bolt:3")
-    self.assertEqual(
-        {"mock_spout":2, "mock_bolt":3}, topology_helpers.get_component_rammap(topology))
-
-    # When partial rammap is specified, rest of the components should get default
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2")
-    expected_component_rammap = {
-        "mock_spout": 2,
-        "mock_bolt": system_constants.DEFAULT_RAM_FOR_INSTANCE
-    }
-    self.assertEqual(expected_component_rammap, topology_helpers.get_component_rammap(topology))
-
-    # When container ram is specified.
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED, 8)
-
-    expected_component_rammap = {
-        "mock_spout": 2,
-        "mock_bolt": 2
-    }
-    component_rammap = topology_helpers.get_component_rammap(topology)
-    self.assertEqual(expected_component_rammap, component_rammap)
-
-    # When nothing is specified.
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    component_rammap = topology_helpers.get_component_rammap(topology)
-    expected_component_rammap = {
-        "mock_spout": system_constants.DEFAULT_RAM_FOR_INSTANCE,
-        "mock_bolt": system_constants.DEFAULT_RAM_FOR_INSTANCE
-    }
-    self.assertEqual(expected_component_rammap, component_rammap)
-
-    # Unmock the things that we mocked.
-    system_constants.RAM_FOR_STMGR = original_ram_for_stmgr
-
-  def test_get_ram_per_container(self):
-    # Mock a few things
-    original_ram_for_stmgr = system_constants.RAM_FOR_STMGR
-    system_constants.RAM_FOR_STMGR = 2
-    original_default_ram_for_instance = system_constants.DEFAULT_RAM_FOR_INSTANCE
-    system_constants.DEFAULT_RAM_FOR_INSTANCE = 1
-
-    # When rammap is specified
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2,mock_bolt:3")
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    self.assertEqual(10, topology_helpers.get_ram_per_container(topology))
-
-    # When partial rammap is specified, rest of the components should get default
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2")
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    expected_ram_per_container = 6
-    self.assertEqual(expected_ram_per_container, topology_helpers.get_ram_per_container(topology))
-
-    # If container ram is specified
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    requested_ram = 15000
-    self.mock_proto.add_topology_config(
-        topology, api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED, str(requested_ram))
-    # Difference should be less than the total instances
-    self.assertLess(abs(topology_helpers.get_ram_per_container(topology) - requested_ram), 12)
-
-    # When nothing is specified
-    topology = self.mock_proto.create_mock_simple_topology(4, 8)
-    self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
-    expected_ram_per_container = 5
-    self.assertEqual(expected_ram_per_container, topology_helpers.get_ram_per_container(topology))
-
-    # Unmock the things that we mocked.
-    system_constants.RAM_FOR_STMGR = original_ram_for_stmgr
-    system_constants.DEFAULT_RAM_FOR_INSTANCE = original_default_ram_for_instance
diff --git a/third_party/zookeeper/BUILD b/third_party/zookeeper/BUILD
index b0da55d..6d4c74c 100644
--- a/third_party/zookeeper/BUILD
+++ b/third_party/zookeeper/BUILD
@@ -3,7 +3,7 @@
 package(default_visibility = ["//visibility:public"])
 
 package_name = "zookeeper"
-package_version = "3.4.5-macfix"
+package_version = "3.4.10"
 
 package_file = package_name + "-" + package_version + ".tar.gz"
 package_dir = package_name + "-" + package_version
diff --git a/third_party/zookeeper/zookeeper-3.4.10.tar.gz b/third_party/zookeeper/zookeeper-3.4.10.tar.gz
new file mode 100644
index 0000000..1c2dcdb
--- /dev/null
+++ b/third_party/zookeeper/zookeeper-3.4.10.tar.gz
Binary files differ
diff --git a/third_party/zookeeper/zookeeper-3.4.5-macfix.tar.gz b/third_party/zookeeper/zookeeper-3.4.5-macfix.tar.gz
deleted file mode 100644
index 3842f5c..0000000
--- a/third_party/zookeeper/zookeeper-3.4.5-macfix.tar.gz
+++ /dev/null
Binary files differ
diff --git a/tools/rules/heron_deps.bzl b/tools/rules/heron_deps.bzl
index d02dcb5..4b1fbeb 100644
--- a/tools/rules/heron_deps.bzl
+++ b/tools/rules/heron_deps.bzl
@@ -16,3 +16,9 @@
         "//heron/proto:proto_stmgr_java",
         "@com_google_protobuf_protobuf_java//jar",
     ]
+
+def heron_java_api_proto_files():
+    return [
+        "//heron/proto:proto_topology_java",
+        "@com_google_protobuf_protobuf_java//jar",
+    ]