Merge branch 'objmagic/limit-msg-size' into 0.15.1.1-rc
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index 443bd31..e5a0e89 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -2,7 +2,7 @@
package(default_visibility = ["//visibility:public"])
-load("/tools/rules/heron_deps", "heron_java_proto_files")
+load("/tools/rules/heron_deps", "heron_java_api_proto_files")
load("/tools/rules/jarjar_rules", "jarjar_binary")
load("/tools/rules/build_defs", "DOCLINT_HTML_AND_SYNTAX")
load("/tools/rules/javadoc", "java_doc")
@@ -15,7 +15,7 @@
)
api_deps_files = \
- heron_java_proto_files() + [
+ heron_java_api_proto_files() + [
":classification",
"//heron/common/src/java:basics-java",
]
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
index d784048..b06395b 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
@@ -205,6 +205,11 @@
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as<int>();
}
+sp_int32 HeronInternalsConfigReader::GetHeronStreammgrHeronTupleSetMessageMaxBytes() {
+ return config_[HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES]
+ .as<int>();
+}
+
sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as<int>();
}
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h
index 37bed01..93c3ef5 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.h
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.h
@@ -156,6 +156,9 @@
// The max number of messages in the memory pool for each message type
sp_int32 GetHeronStreammgrMempoolMaxMessageNumber();
+ // The max byte size of HeronTupleSet message in stream manager
+ sp_int32 GetHeronStreammgrHeronTupleSetMessageMaxBytes();
+
// Get the Nbucket value, for efficient acknowledgement
sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets();
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
index addcadf..069a400 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
@@ -88,6 +88,8 @@
"heron.streammgr.cache.drain.size.mb";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER =
"heron.streammgr.mempool.max.message.number";
+const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES =
+ "heron.streammgr.herontupleset.message.max.bytes";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS =
"heron.streammgr.xormgr.rotatingmap.nbuckets";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_MAX_ATTEMPTS =
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h
index f711a9b..49422de 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.h
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.h
@@ -140,6 +140,9 @@
// The max number of messages in the memory pool for each message type
static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER;
+ // The max byte size of HeronTupleSet message in stream manager
+ static const sp_string HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES;
+
// For efficient acknowledgement
static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS;
diff --git a/heron/common/src/python/basics/__init__.py b/heron/common/src/python/basics/__init__.py
deleted file mode 100644
index c2b5d47..0000000
--- a/heron/common/src/python/basics/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-'''basic module'''
-__all__ = ['event_looper', 'gateway_looper']
-
-from .event_looper import EventLooper
-from .gateway_looper import GatewayLooper
diff --git a/heron/common/src/python/config/__init__.py b/heron/common/src/python/config/__init__.py
deleted file mode 100644
index 48daa21..0000000
--- a/heron/common/src/python/config/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-''' config module: various singleton configuration modules for PyHeron '''
-__all__ = ['system_config']
diff --git a/heron/common/tests/python/basics/BUILD b/heron/common/tests/python/basics/BUILD
deleted file mode 100644
index a6e17b8..0000000
--- a/heron/common/tests/python/basics/BUILD
+++ /dev/null
@@ -1,30 +0,0 @@
-package(default_visibility = ["//visibility:public"])
-load("/tools/rules/pex_rules", "pex_library", "pex_test")
-
-pex_test(
- name = "gateway_looper_unittest",
- srcs = ["gateway_looper_unittest.py"],
- deps = [
- "//heron/common/src/python:common-py"
- ],
- reqs = [
- "py==1.4.27",
- "pytest==2.6.4",
- "unittest2==0.5.1",
- ],
- size = "small",
-)
-
-pex_test(
- name = "event_looper_unittest",
- srcs = ["event_looper_unittest.py"],
- deps = [
- "//heron/common/src/python:common-py"
- ],
- reqs = [
- "py==1.4.27",
- "pytest==2.6.4",
- "unittest2==0.5.1",
- ],
- size = "small",
-)
diff --git a/heron/common/tests/python/basics/__init__.py b/heron/common/tests/python/basics/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/heron/common/tests/python/basics/__init__.py
+++ /dev/null
diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml
index a1b77b1..30f355d 100644
--- a/heron/config/src/yaml/conf/examples/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
index b592fbc..4bb9636 100644
--- a/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1
diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml
index 56fb659..b96d535 100644
--- a/heron/config/src/yaml/conf/local/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/local/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml
index 1c79d34..62ceae3 100644
--- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml
index 7537a96..08241fb 100644
--- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml
+++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml
@@ -50,6 +50,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# For efficient acknowledgement
heron.streammgr.xormgr.rotatingmap.nbuckets: 3
diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml
index a1b77b1..b56780d 100644
--- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml
+++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml
@@ -65,6 +65,9 @@
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512
+# The max byte size of a HeronTupleSet message allowed in memory pool
+heron.streammgr.herontupleset.message.max.bytes: 83886080
+
# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300
diff --git a/heron/instance/src/python/basics/base_instance.py b/heron/instance/src/python/basics/base_instance.py
index 9bd9eca..a8b42f0 100644
--- a/heron/instance/src/python/basics/base_instance.py
+++ b/heron/instance/src/python/basics/base_instance.py
@@ -21,15 +21,15 @@
import heronpy.api.api_constants as api_constants
from heronpy.api.state.stateful_component import StatefulComponent
-from heron.common.src.python.config import system_config
from heron.common.src.python.utils.log import Log
from heron.proto import tuple_pb2
from heron.instance.src.python.utils.misc import SerializerHelper
from heron.instance.src.python.utils.misc import OutgoingTupleHelper
+from heron.instance.src.python.utils import system_config
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
import heron.common.src.python.pex_loader as pex_loader
# pylint: disable=too-many-instance-attributes
diff --git a/heron/instance/src/python/basics/bolt_instance.py b/heron/instance/src/python/basics/bolt_instance.py
index 94a60a5..a92b095 100644
--- a/heron/instance/src/python/basics/bolt_instance.py
+++ b/heron/instance/src/python/basics/bolt_instance.py
@@ -27,7 +27,7 @@
from heron.instance.src.python.utils.metrics import BoltMetrics
from heron.instance.src.python.utils.tuple import TupleHelper, HeronTuple
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
from .base_instance import BaseInstance
diff --git a/heron/instance/src/python/basics/spout_instance.py b/heron/instance/src/python/basics/spout_instance.py
index 5d4490b..fc5335b 100644
--- a/heron/instance/src/python/basics/spout_instance.py
+++ b/heron/instance/src/python/basics/spout_instance.py
@@ -28,7 +28,7 @@
from heron.proto import topology_pb2, tuple_pb2, ckptmgr_pb2
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
from .base_instance import BaseInstance
diff --git a/heron/instance/src/python/instance/st_heron_instance.py b/heron/instance/src/python/instance/st_heron_instance.py
index 1115a79..17d3db6 100644
--- a/heron/instance/src/python/instance/st_heron_instance.py
+++ b/heron/instance/src/python/instance/st_heron_instance.py
@@ -23,8 +23,6 @@
import heronpy.api.api_constants as api_constants
from heronpy.api.state.state import HashMapState
-from heron.common.src.python.basics import GatewayLooper
-from heron.common.src.python.config import system_config
from heron.common.src.python.utils import log
from heron.proto import physical_plan_pb2, tuple_pb2, ckptmgr_pb2, common_pb2
@@ -35,9 +33,10 @@
from heron.instance.src.python.utils.metrics import GatewayMetrics, PyMetrics, MetricsCollector
from heron.instance.src.python.network import MetricsManagerClient, SingleThreadStmgrClient
from heron.instance.src.python.network import create_socket_options
+from heron.instance.src.python.network import GatewayLooper
from heron.instance.src.python.basics import SpoutInstance, BoltInstance
-
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
Log = log.Log
AssignedInstance = collections.namedtuple('AssignedInstance', 'is_spout, protobuf, py_class')
diff --git a/heron/instance/src/python/network/__init__.py b/heron/instance/src/python/network/__init__.py
index 1d4ae80..50a59e6 100644
--- a/heron/instance/src/python/network/__init__.py
+++ b/heron/instance/src/python/network/__init__.py
@@ -1,6 +1,9 @@
'''module for network component for python instance'''
-__all__ = ['metricsmgr_client', 'heron_client', 'st_stmgr_client', 'protocol', 'socket_options']
+__all__ = ['event_looper', 'gateway_looper', 'metricsmgr_client',
+ 'heron_client', 'st_stmgr_client', 'protocol', 'socket_options']
+from .event_looper import EventLooper
+from .gateway_looper import GatewayLooper
from .protocol import HeronProtocol, OutgoingPacket, IncomingPacket, REQID, StatusCode
from .socket_options import SocketOptions, create_socket_options
from .metricsmgr_client import MetricsManagerClient
diff --git a/heron/common/src/python/basics/event_looper.py b/heron/instance/src/python/network/event_looper.py
similarity index 100%
rename from heron/common/src/python/basics/event_looper.py
rename to heron/instance/src/python/network/event_looper.py
diff --git a/heron/common/src/python/basics/gateway_looper.py b/heron/instance/src/python/network/gateway_looper.py
similarity index 100%
rename from heron/common/src/python/basics/gateway_looper.py
rename to heron/instance/src/python/network/gateway_looper.py
diff --git a/heron/instance/src/python/network/heron_client.py b/heron/instance/src/python/network/heron_client.py
index 87ce3aa..2567eb1 100644
--- a/heron/instance/src/python/network/heron_client.py
+++ b/heron/instance/src/python/network/heron_client.py
@@ -20,7 +20,7 @@
import time
from heron.common.src.python.utils.log import Log
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
from heron.instance.src.python.network import HeronProtocol, REQID, StatusCode, OutgoingPacket
# pylint: disable=too-many-instance-attributes
@@ -30,7 +30,7 @@
def __init__(self, looper, hostname, port, socket_map, socket_options):
"""Initializes HeronClient
- :type looper: ``GatewayLooper`` (heron.common.src.python.basics)
+ :type looper: ``GatewayLooper`` (heron.instance.src.python.network)
:param looper: looper object
:type hostname: str
:param hostname: endpoint hostname
diff --git a/heron/instance/src/python/network/metricsmgr_client.py b/heron/instance/src/python/network/metricsmgr_client.py
index 225b249..214209e 100644
--- a/heron/instance/src/python/network/metricsmgr_client.py
+++ b/heron/instance/src/python/network/metricsmgr_client.py
@@ -14,15 +14,15 @@
'''metrics manager client'''
import socket
-from heron.common.src.python.config import system_config
from heron.common.src.python.utils.log import Log
from heron.instance.src.python.network.heron_client import HeronClient
from heron.instance.src.python.network import StatusCode
+from heron.instance.src.python.utils import system_config
from heron.proto import metrics_pb2, common_pb2
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
class MetricsManagerClient(HeronClient):
"""MetricsManagerClient, responsible for communicating with Metrics Manager"""
diff --git a/heron/instance/src/python/network/socket_options.py b/heron/instance/src/python/network/socket_options.py
index 18f1460..00bd09f 100644
--- a/heron/instance/src/python/network/socket_options.py
+++ b/heron/instance/src/python/network/socket_options.py
@@ -15,8 +15,8 @@
from collections import namedtuple
from heron.common.src.python.utils.log import Log
-import heron.common.src.python.system_constants as const
-from heron.common.src.python.config import system_config
+import heron.instance.src.python.utils.system_constants as const
+from heron.instance.src.python.utils import system_config
SocketOptions = namedtuple('Options', 'nw_write_batch_size_bytes, nw_write_batch_time_ms, '
'nw_read_batch_size_bytes, nw_read_batch_time_ms, '
diff --git a/heron/instance/src/python/network/st_stmgr_client.py b/heron/instance/src/python/network/st_stmgr_client.py
index 48935c5..d2a833a 100644
--- a/heron/instance/src/python/network/st_stmgr_client.py
+++ b/heron/instance/src/python/network/st_stmgr_client.py
@@ -15,15 +15,15 @@
import sys
import traceback
-from heron.common.src.python.config import system_config
from heron.common.src.python.utils.log import Log
from heron.instance.src.python.network.heron_client import HeronClient
from heron.instance.src.python.network import StatusCode
+from heron.instance.src.python.utils import system_config
from heron.proto import common_pb2, stmgr_pb2, tuple_pb2, ckptmgr_pb2
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
diff --git a/heron/instance/src/python/utils/__init__.py b/heron/instance/src/python/utils/__init__.py
index 4654d2c..2cb4e0e 100644
--- a/heron/instance/src/python/utils/__init__.py
+++ b/heron/instance/src/python/utils/__init__.py
@@ -1,3 +1,3 @@
'''common utility modules'''
__all__ = ['metrics', 'misc', 'topology', 'config', 'tracker_access',
- 'tuple', 'proc', 'log']
+ 'system_constants', 'system_config', 'tuple', 'proc', 'log']
diff --git a/heron/instance/src/python/utils/metrics/metrics_helper.py b/heron/instance/src/python/utils/metrics/metrics_helper.py
index fe256f4..1bbb44e 100644
--- a/heron/instance/src/python/utils/metrics/metrics_helper.py
+++ b/heron/instance/src/python/utils/metrics/metrics_helper.py
@@ -14,8 +14,8 @@
'''metrics_helper: helper classes for managing common metrics'''
from heron.common.src.python.utils.log import Log
-from heron.common.src.python.config import system_config
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
from heron.proto import metrics_pb2
diff --git a/heron/instance/src/python/utils/metrics/py_metrics.py b/heron/instance/src/python/utils/metrics/py_metrics.py
index c6a520c..3d52bb0 100644
--- a/heron/instance/src/python/utils/metrics/py_metrics.py
+++ b/heron/instance/src/python/utils/metrics/py_metrics.py
@@ -17,8 +17,8 @@
import traceback
from heronpy.api.metrics import AssignableMetrics, MultiAssignableMetrics
from .metrics_helper import BaseMetricsHelper
-import heron.common.src.python.system_constants as constants
-from heron.common.src.python.config import system_config
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
from heron.common.src.python.utils.log import Log
# pylint: disable=too-many-instance-attributes
diff --git a/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py b/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py
index cc0912c..80c2776 100644
--- a/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py
+++ b/heron/instance/src/python/utils/misc/outgoing_tuple_helper.py
@@ -14,11 +14,11 @@
'''outgoing_tuple_helper.py: module to provide a helper class for preparing and pushing tuples'''
import sys
-from heron.common.src.python.config import system_config
from heron.common.src.python.utils.log import Log
from heron.proto import tuple_pb2, topology_pb2, ckptmgr_pb2
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
+from heron.instance.src.python.utils import system_config
# pylint: disable=too-many-instance-attributes
# pylint: disable=no-value-for-parameter
diff --git a/heron/common/src/python/config/system_config.py b/heron/instance/src/python/utils/system_config.py
similarity index 100%
rename from heron/common/src/python/config/system_config.py
rename to heron/instance/src/python/utils/system_config.py
diff --git a/heron/common/src/python/system_constants.py b/heron/instance/src/python/utils/system_constants.py
similarity index 100%
rename from heron/common/src/python/system_constants.py
rename to heron/instance/src/python/utils/system_constants.py
diff --git a/heron/instance/src/python/utils/topology/topology_context_impl.py b/heron/instance/src/python/utils/topology/topology_context_impl.py
index 4fad3e5..29d1406 100644
--- a/heron/instance/src/python/utils/topology/topology_context_impl.py
+++ b/heron/instance/src/python/utils/topology/topology_context_impl.py
@@ -23,7 +23,7 @@
import heronpy.api.api_constants as api_constants
from heron.instance.src.python.utils.metrics import MetricsCollector
-import heron.common.src.python.system_constants as system_constants
+import heron.instance.src.python.utils.system_constants as system_constants
import heron.common.src.python.pex_loader as pex_loader
class TopologyContextImpl(TopologyContext):
diff --git a/heron/instance/tests/python/network/BUILD b/heron/instance/tests/python/network/BUILD
index cde4b06..dc685b6 100644
--- a/heron/instance/tests/python/network/BUILD
+++ b/heron/instance/tests/python/network/BUILD
@@ -101,3 +101,31 @@
],
size = "small",
)
+
+pex_test(
+ name = "gateway_looper_unittest",
+ srcs = ["gateway_looper_unittest.py"],
+ deps = [
+ "//heron/instance/src/python:instance-py",
+ ],
+ reqs = [
+ "py==1.4.27",
+ "pytest==2.6.4",
+ "unittest2==0.5.1",
+ ],
+ size = "small",
+)
+
+pex_test(
+ name = "event_looper_unittest",
+ srcs = ["event_looper_unittest.py"],
+ deps = [
+ "//heron/instance/src/python:instance-py",
+ ],
+ reqs = [
+ "py==1.4.27",
+ "pytest==2.6.4",
+ "unittest2==0.5.1",
+ ],
+ size = "small",
+)
diff --git a/heron/common/tests/python/basics/event_looper_unittest.py b/heron/instance/tests/python/network/event_looper_unittest.py
similarity index 97%
rename from heron/common/tests/python/basics/event_looper_unittest.py
rename to heron/instance/tests/python/network/event_looper_unittest.py
index acb9fca..da31f9c 100644
--- a/heron/common/tests/python/basics/event_looper_unittest.py
+++ b/heron/instance/tests/python/network/event_looper_unittest.py
@@ -15,7 +15,7 @@
import time
import unittest2 as unittest
-from heron.common.src.python.basics import EventLooper
+from heron.instance.src.python.network import EventLooper
# pylint: disable=missing-docstring
# pylint: disable=W0212
diff --git a/heron/common/tests/python/basics/gateway_looper_unittest.py b/heron/instance/tests/python/network/gateway_looper_unittest.py
similarity index 95%
rename from heron/common/tests/python/basics/gateway_looper_unittest.py
rename to heron/instance/tests/python/network/gateway_looper_unittest.py
index ae1ba0d..79f4e5b 100644
--- a/heron/common/tests/python/basics/gateway_looper_unittest.py
+++ b/heron/instance/tests/python/network/gateway_looper_unittest.py
@@ -16,7 +16,7 @@
import time
import unittest2 as unittest
-from heron.common.src.python.basics.gateway_looper import GatewayLooper
+from heron.instance.src.python.network.gateway_looper import GatewayLooper
# pylint: disable=missing-docstring
class GatewayLooperTest(unittest.TestCase):
diff --git a/heron/instance/tests/python/network/mock_generator.py b/heron/instance/tests/python/network/mock_generator.py
index b49411d..8d74965 100644
--- a/heron/instance/tests/python/network/mock_generator.py
+++ b/heron/instance/tests/python/network/mock_generator.py
@@ -13,8 +13,8 @@
# limitations under the License.
'''mock_generator for instance/network'''
# pylint : disable=missing-docstring
-from heron.common.src.python.basics import EventLooper
-import heron.common.src.python.system_constants as constants
+from heron.instance.src.python.network import EventLooper
+import heron.instance.src.python.utils.system_constants as constants
from heron.instance.src.python.utils.misc import HeronCommunicator
from heron.instance.src.python.network import SingleThreadStmgrClient, MetricsManagerClient
from heron.instance.src.python.network import SocketOptions
@@ -29,7 +29,7 @@
def __init__(self):
socket_options = SocketOptions(32768, 16, 32768, 16, 1024000, 1024000)
- with patch("heron.common.src.python.config.system_config.get_sys_config",
+ with patch("heron.instance.src.python.utils.system_config.get_sys_config",
side_effect=lambda: {constants.INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC: 10}):
SingleThreadStmgrClient.__init__(self, EventLooper(), None, self.HOST, self.PORT,
"topology_name", "topology_id",
@@ -50,7 +50,7 @@
PORT = 9000
def __init__(self):
socket_options = SocketOptions(32768, 16, 32768, 16, 1024000, 1024000)
- with patch("heron.common.src.python.config.system_config.get_sys_config",
+ with patch("heron.instance.src.python.utils.system_config.get_sys_config",
side_effect=lambda: {constants.INSTANCE_RECONNECT_METRICSMGR_INTERVAL_SEC: 10,
constants.INSTANCE_METRICS_SYSTEM_SAMPLE_INTERVAL_SEC: 10}):
stream = HeronCommunicator(producer_cb=None, consumer_cb=None)
diff --git a/heron/instance/tests/python/utils/mock_generator.py b/heron/instance/tests/python/utils/mock_generator.py
index 46bd5ec..8aac3cf 100644
--- a/heron/instance/tests/python/utils/mock_generator.py
+++ b/heron/instance/tests/python/utils/mock_generator.py
@@ -27,7 +27,7 @@
HeronCommunicator)
from heron.proto import tuple_pb2
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
import heron.instance.tests.python.mock_protobuf as mock_protobuf
prim_list = [1000, -234, 0.00023, "string",
@@ -114,7 +114,7 @@
if mode == MockOutgoingTupleHelper.SAMPLE_SUCCESS:
pplan_helper, out_stream = self._prepare_sample_success()
- with patch("heron.common.src.python.config.system_config.get_sys_config",
+ with patch("heron.instance.src.python.utils.system_config.get_sys_config",
side_effect=lambda: sample_sys_config):
super(MockOutgoingTupleHelper, self).__init__(pplan_helper, out_stream)
diff --git a/heron/instance/tests/python/utils/py_metrics_unittest.py b/heron/instance/tests/python/utils/py_metrics_unittest.py
index 77d691b..e1bd6d3 100644
--- a/heron/instance/tests/python/utils/py_metrics_unittest.py
+++ b/heron/instance/tests/python/utils/py_metrics_unittest.py
@@ -19,7 +19,7 @@
import threading
from heron.instance.src.python.utils.metrics.py_metrics import PyMetrics
-import heron.common.src.python.system_constants as constants
+import heron.instance.src.python.utils.system_constants as constants
import heron.instance.tests.python.utils.mock_generator as mock_generator
Mem = namedtuple('Mem', ['rss', 'vms'])
@@ -29,7 +29,7 @@
class PyMetricsTest(unittest.TestCase):
def setUp(self):
metrics_collector = mock_generator.MockMetricsCollector()
- with patch("heron.common.src.python.config.system_config.get_sys_config",
+ with patch("heron.instance.src.python.utils.system_config.get_sys_config",
side_effect=lambda: {constants.HERON_METRICS_EXPORT_INTERVAL_SEC: 10}):
self.py_metrics = PyMetrics(metrics_collector)
self.py_metrics.process = Mock()
diff --git a/heron/proto/BUILD b/heron/proto/BUILD
index d814fe9..21b38fe 100644
--- a/heron/proto/BUILD
+++ b/heron/proto/BUILD
@@ -177,40 +177,6 @@
],
)
-# This is for binaries that don't want protobuf
-# dependency to be included in the generated jar
-java_library(
- name = "proto-java-nl",
- srcs = [
- "Empty.java",
- ],
- deps = [
- ":proto_common_java",
- ":proto_execution_state_java",
- ":proto_stats_java",
- ":proto_topology_java",
- ":proto_scheduler_java",
- ":proto_packing_plan_java",
- ":proto_physical_plan_java",
- ":proto_metrics_java",
- ":proto_tmaster_java",
- ":proto_tuple_java",
- ":proto_stmgr_java",
- ":proto_ckptmgr_java",
- ":proto_networktests_java",
- "//third_party/java:protobuf-java-neverlink",
- ],
-)
-
-java_binary(
- name = "proto-bin-java",
- srcs = ["Empty.java"],
- create_executable = 0,
- deps = [
- ":proto-java-nl",
- ],
-)
-
pex_library(
name = "proto-py",
deps = [
diff --git a/heron/proto/pplan.proto.oldx b/heron/proto/pplan.proto.oldx
deleted file mode 100644
index 3a270bf..0000000
--- a/heron/proto/pplan.proto.oldx
+++ /dev/null
@@ -1,91 +0,0 @@
-package heron.proto.system;
-
-option java_package = "com.twitter.heron.proto.system";
-option java_outer_classname = "PhysicalPlans";
-
-enum CompType {
- SPOUT = 1;
- BOLT = 2;
-}
-
-enum GroupType {
- RANDOM = 1;
- SHUFFLE = 2;
- FIELDS = 3;
- REPLICATE = 4;
- NONE = 5;
- SELF = 6;
-}
-
-message Schema {
- repeated string keys = 1;
-}
-
-message Manager {
- required string id = 1;
- required string name = 2;
- required int32 dport = 3;
- required int32 cport = 4;
- required string epoint = 5;
-}
-
-message Group {
- required string id = 1;
- required string host_id = 2;
-}
-
-message Stream {
- required string id = 1;
- required GroupType gtype = 2;
-}
-
-message Grouping {
- required Stream stream = 1;
- repeated Group groups = 2;
-}
-
-message Input {
- required Stream stream = 1;
- required string group_id = 2;
-}
-
-message Output {
- required string stream_id = 1;
- required Schema schema = 2;
-}
-
-message Worker {
- required string id = 1;
- required string host_id = 2;
- required CompType comp_type = 3;
- required string instance_id = 4;
-}
-
-message Component {
- required string id = 1;
- required string class_name = 2;
-}
-
-message SpoutInstance {
- required string id = 1;
- required Component comp_id = 2;
- repeated Output ostreams = 3;
- repeated string params = 4;
-}
-
-message BoltInstance {
- required string id = 1;
- required Component comp_id = 2;
- repeated Input istreams = 3;
- repeated Output ostreams = 4;
- repeated string params = 5;
-}
-
-message PhysicalPlan {
- repeated Manager mgrs = 1;
- repeated Stream streams = 2;
- repeated Grouping groupings = 3;
- repeated Worker workers = 4;
- repeated SpoutInstance spouts = 5;
- repeated BoltInstance bolts = 6;
-}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 027fee3..1923e6b 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -120,7 +120,9 @@
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT,
back_pressure_metric_initiated_);
spouts_under_back_pressure_ = false;
-
+ max_herontupleset_size_in_bytes =
+ config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes();
+ space_check_counter = 0;
// Update queue related metrics every 10 seconds
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateQueueMetrics(status);
@@ -431,7 +433,20 @@
->incr_by(_message->control().fails_size());
}
stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message);
- __global_protobuf_pool_release__(_message);
+ space_check_counter = (space_check_counter + 1) % 1024;
+ if (space_check_counter == 0) {
+ auto message_size = _message->SpaceUsed();
+ if (message_size >= max_herontupleset_size_in_bytes) {
+ LOG(WARNING) << "HeronTupleSet message has size " << message_size <<
+ " bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." <<
+ " Release to memory allocator rather than memory pool.";
+ delete _message;
+ } else {
+ __global_protobuf_pool_release__(_message);
+ }
+ } else {
+ __global_protobuf_pool_release__(_message);
+ }
}
void StMgrServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 0faa08f..0ad4707 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -201,6 +201,8 @@
heron::common::TimeSpentMetric* back_pressure_metric_initiated_;
bool spouts_under_back_pressure_;
+ sp_uint32 max_herontupleset_size_in_bytes;
+ sp_uint32 space_check_counter;
// Stateful processing related member variables
NeighbourCalculator* neighbour_calculator_;
diff --git a/heron/tools/tracker/src/python/topology_helpers.py b/heron/tools/tracker/src/python/topology_helpers.py
deleted file mode 100644
index d30c5d4..0000000
--- a/heron/tools/tracker/src/python/topology_helpers.py
+++ /dev/null
@@ -1,330 +0,0 @@
-# Copyright 2016 Twitter. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-This module includes a bunch of library functions
-relevant for dealing with the topology structure
-"""
-
-import sets
-
-from heronpy.api import api_constants
-from heron.common.src.python import system_constants
-from heron.proto import topology_pb2
-
-def get_topology_config(topology, key):
- """
- Helper function to get the value of key in topology config.
- Return None if the key is not present.
- """
- for kv in topology.topology_config.kvs:
- if kv.key == key:
- return kv.value
- return None
-
-def get_component_parallelism(topology):
- """
- The argument is the proto object for topology.
- Returns a map of components to their parallelism.
- """
- cmap = {}
- components = list(topology.spouts) + list(topology.bolts)
- for component in components:
- component_name = component.comp.name
- for kv in component.comp.config.kvs:
- if kv.key == api_constants.TOPOLOGY_COMPONENT_PARALLELISM:
- cmap[component_name] = int(kv.value)
- return cmap
-
-def get_nstmgrs(topology):
- """
- The argument is the proto object for topology.
- Returns the number of stream managers for the topology.
- This is equal to the number of containers.
- If not present, return 1 as default.
- """
- return int(get_topology_config(topology, api_constants.TOPOLOGY_STMGRS) or 1)
-
-def get_instance_opts(topology):
- """
- The argument is the proto object for topology.
- Returns the topology worker child options.
- If not present, return empty string.
- """
- return str(get_topology_config(topology, api_constants.TOPOLOGY_WORKER_CHILDOPTS) or "")
-
-def get_total_instances(topology):
- """
- The argument is the proto object for topology.
- Returns the total number of instances based on all parallelisms.
- """
- cmap = get_component_parallelism(topology)
- return sum(cmap.values())
-
-def get_additional_classpath(topology):
- """
- The argument is the proto object for topology.
- Returns an empty string if no additional classpath specified
- """
- additional_classpath = str(get_topology_config(
- topology, api_constants.TOPOLOGY_ADDITIONAL_CLASSPATH) or "")
- return additional_classpath
-
-def get_cpus_per_container(topology):
- """
- The argument is the proto object for topology.
- Calculate and return the CPUs per container.
- It can be calculated in two ways:
- 1. It is passed in config.
- 2. Allocate 1 CPU per instance in a container, and 1 extra for stmrs.
- """
- cpus = get_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_CPU_REQUESTED)
- if not cpus:
- ninstances = get_total_instances(topology)
- nstmgrs = get_nstmgrs(topology)
- cpus = float(ninstances) / nstmgrs + 1 # plus one for stmgr
- return float(cpus)
-
-def get_disk_per_container(topology):
- """
- The argument is the proto object for topology.
- Calculate and return the disk per container.
- It can be calculated in two ways:
- 1. It is passed in config.
- 2. Allocate 1 GB per instance in a container, and 12 GB extra for the rest.
- """
- disk = get_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_DISK_REQUESTED)
- if not disk:
- ninstances = get_total_instances(topology)
- nstmgrs = get_nstmgrs(topology)
- # Round to the ceiling
- maxInstanceInOneContainer = (ninstances + nstmgrs - 1) / nstmgrs
-
- disk = maxInstanceInOneContainer * system_constants.GB + \
- system_constants.DEFAULT_DISK_PADDING_PER_CONTAINER
- return disk
-
-def get_ram_per_container(topology):
- """
- The argument is the proto object for topology.
- Calculate and return RAM per container for the topology based
- container rammap. Since rammap takes into account all the
- configs and cases, we just add some ram for stmgr and return it.
- """
- component_distribution = get_component_distribution(topology)
- rammap = get_component_rammap(topology)
- maxsize = 0
- for (_, container_items) in component_distribution.items():
- ramsize = 0
- for (component_name, _, _) in container_items:
- ramsize += int(rammap[component_name])
- if ramsize > maxsize:
- maxsize = ramsize
- return maxsize + system_constants.RAM_FOR_STMGR
-
-def get_component_rammap(topology):
- """
- The argument is the proto object for topology.
- It is calculated based on the following priority:
- 1. Form the user specified component rammap. If the rammap
- is complete, return it.
- 2. If only some component rams are specified, assign them
- the default RAM, and return it.
- 3. If no component RAM is specified, take the container ram requested
- and divide it equally among the max possible instances.
- 4. If container RAM was not requested, assign the default RAM
- to all components.
- Returns a map from component name to ram in bytes
- """
- component_distribution = get_component_distribution(topology)
- components = list(topology.spouts) + list(topology.bolts)
-
- # first check if user has specified the rammap
- rammap = get_user_rammap(topology) or {}
-
- # Some or all of them are there, so assign default to those components
- # that are not specified.
- if len(rammap.keys()) > 0:
- for component in components:
- component_name = component.comp.name
- if component_name not in rammap:
- rammap[component_name] = system_constants.DEFAULT_RAM_FOR_INSTANCE
- return rammap
-
- max_instances_in_a_container = max(map(lambda x: len(x[1]), component_distribution.items()))
-
- # User has not specified any kind of rammap
- # We just find the ram needed for a container and divide
- # memory equally
- requested_ram_per_container = get_container_ram_requested(topology)
- if requested_ram_per_container != None:
- ram_for_jvms = requested_ram_per_container - system_constants.RAM_FOR_STMGR
- ram_per_instance = int(ram_for_jvms / max_instances_in_a_container)
- rammap = {}
- for component in components:
- component_name = component.comp.name
- rammap[component_name] = ram_per_instance
- return rammap
-
- # Nothing was specified.
- # The default is to allocate one 1gb per instance of all components
- ram_per_instance = int(system_constants.DEFAULT_RAM_FOR_INSTANCE)
- rammap = {}
- for component in components:
- component_name = component.comp.name
- rammap[component_name] = ram_per_instance
- return rammap
-
-def strip_java_objects(topology):
- """
- The argument is the proto object for topology.
- The java objects are huge objects and topology
- is stripped off these objects so that it can be stored
- in zookeeper.
- """
- stripped_topology = topology_pb2.Topology()
- stripped_topology.CopyFrom(topology)
- components = list(stripped_topology.spouts) + list(stripped_topology.bolts)
- for component in components:
- if component.comp.HasField("serialized_object"):
- component.comp.ClearField("serialized_object")
-
- return stripped_topology
-
-def get_component_distribution(topology):
- """
- The argument is the proto object for topology.
- Distribute components and their instances evenly across containers in a round robin fashion
- Return value is a map from container id to a list.
- Each element of a list is a tuple (component_name, global_task_id, component_index)
- This is essentially the physical plan of the topology.
- """
- containers = {}
- nstmgrs = get_nstmgrs(topology)
- for i in range(1, nstmgrs + 1):
- containers[i] = []
- index = 1
- global_task_id = 1
- for (component_name, ninstances) in get_component_parallelism(topology).items():
- component_index = 0
- for i in range(ninstances):
- containers[index].append((str(component_name), str(global_task_id), str(component_index)))
- component_index = component_index + 1
- global_task_id = global_task_id + 1
- index = index + 1
- if index == nstmgrs + 1:
- index = 1
- return containers
-
-def get_user_rammap(topology):
- """
- The argument is the proto object for topology.
- Returns a dict of component to ram as specified by user.
- Expected user input is "comp1:42,comp2:24,..."
- where 42 and 24 represents the ram requested in bytes.
- Returns None if nothing is specified.
- """
- rammap = get_topology_config(topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP)
- if rammap:
- rmap = {}
- for component_ram in rammap.split(","):
- component, ram = component_ram.split(":")
- rmap[component] = int(ram)
- return rmap
- return None
-
-def get_container_ram_requested(topology):
- """
- The argument is the proto object for topology.
- Returns the container ram as requested by the user.
- Returns None if not specified.
- """
- ram = get_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED)
- return int(ram) if ram else None
-
-def get_component_jvmopts(topology):
- """
- The argument is the proto object for topology.
- Returns the jvm options if specified by user.
- """
- return str(get_topology_config(topology, api_constants.TOPOLOGY_COMPONENT_JVMOPTS) or "")
-
-def get_topology_state_string(topology):
- """
- The argument is the proto object for topology.
- Returns the topology state as one of:
- 1. RUNNING
- 2. PAUSED
- 3. KILLED
- """
- return topology_pb2.TopologyState.Name(topology.state)
-
-# pylint: disable=too-many-return-statements, too-many-branches
-def sane(topology):
- """ First check if topology name is ok """
- if topology.name == "":
- print "Topology name cannot be empty"
- return False
- if '.' in topology.name or '/' in topology.name:
- print "Topology name cannot contain . or /"
- return False
-
- component_names = set()
- for spout in topology.spouts:
- component_name = spout.comp.name
- if component_name in component_names:
- print component_name + " is duplicated twice"
- return False
- component_names.add(component_name)
-
- for bolt in topology.bolts:
- component_name = bolt.comp.name
- if component_name in component_names:
- print component_name + " is duplicated twice"
- return False
- component_names.add(component_name)
-
- ninstances = get_total_instances(topology)
- nstmgrs = get_nstmgrs(topology)
- if nstmgrs > ninstances:
- print "Number of containers are greater than number of instances"
- return False
-
- for kv in topology.topology_config.kvs:
- if kv.key == api_constants.TOPOLOGY_COMPONENT_RAMMAP:
- rammap = str(kv.value).split(',')
- for component_ram in rammap:
- component_and_ram = component_ram.split(':')
- if len(component_and_ram) != 2:
- print "TOPOLOGY_COMPONENT_RAMMAP is set incorrectly"
- return False
- if not component_and_ram[0] in component_names:
- print "TOPOLOGY_COMPONENT_RAMMAP is set incorrectly"
- return False
-
- # Now check if bolts are reading streams that some component emits
- all_outputs = sets.Set()
- for spout in topology.spouts:
- for outputstream in spout.outputs:
- all_outputs.add((outputstream.stream.id, outputstream.stream.component_name))
- for bolt in topology.bolts:
- for outputstream in bolt.outputs:
- all_outputs.add((outputstream.stream.id, outputstream.stream.component_name))
- for bolt in topology.bolts:
- for inputstream in bolt.inputs:
- if (inputstream.stream.id, inputstream.stream.component_name) not in all_outputs:
- print "Bolt " + bolt.comp.name + " has an input stream that no one emits"
- return False
-
- return True
diff --git a/heron/tools/tracker/tests/python/BUILD b/heron/tools/tracker/tests/python/BUILD
index 82e3b19..0637663 100644
--- a/heron/tools/tracker/tests/python/BUILD
+++ b/heron/tools/tracker/tests/python/BUILD
@@ -11,22 +11,6 @@
)
pex_test(
- name = "topology_helpers_unittest",
- srcs = ["topology_helpers_unittest.py"],
- deps = [
- "//heron/tools/tracker/src/python:tracker-py",
- "//heronpy/api:heron-python-py",
- ],
- reqs = [
- "mock==1.0.1",
- "py==1.4.27",
- "pytest==2.6.4",
- "unittest2==0.5.1",
- ],
- size = "small",
-)
-
-pex_test(
name = "topology_unittest",
srcs = ["topology_unittest.py"],
deps = [
diff --git a/heron/tools/tracker/tests/python/topology_helpers_unittest.py b/heron/tools/tracker/tests/python/topology_helpers_unittest.py
deleted file mode 100644
index 314f493..0000000
--- a/heron/tools/tracker/tests/python/topology_helpers_unittest.py
+++ /dev/null
@@ -1,233 +0,0 @@
-''' topology_helpers_unittest.py '''
-# pylint: disable=missing-docstring
-import unittest2 as unittest
-
-from heronpy.api import api_constants
-from heron.common.src.python import system_constants
-from heron.tools.tracker.src.python import topology_helpers
-from mock_proto import MockProto
-
-class TopologyHelpersTest(unittest.TestCase):
- def setUp(self):
- self.mock_proto = MockProto()
-
- def test_get_component_parallelism(self):
- topology = self.mock_proto.create_mock_medium_topology(1, 2, 3, 4)
-
- cmap = topology_helpers.get_component_parallelism(topology)
- self.assertEqual(1, cmap["mock_spout1"])
- self.assertEqual(2, cmap["mock_bolt1"])
- self.assertEqual(3, cmap["mock_bolt2"])
- self.assertEqual(4, cmap["mock_bolt3"])
-
- def test_get_disk_per_container(self):
- # We would have 9 instances
- topology = self.mock_proto.create_mock_medium_topology(1, 2, 3, 4)
-
- # First try with 1 container, so the disk request should be:
- # 10 * GB + Padding_Disk (12GB) = 22GB
- default_disk = topology_helpers.get_disk_per_container(topology)
- self.assertEqual(22 * system_constants.GB, default_disk)
-
- # Then try with 4 container, so the disk request should be:
- # 10/4 = 2.5 -> 3 (round to ceiling) + 12 = 15GB
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- self.assertEqual(15 * system_constants.GB, topology_helpers.get_disk_per_container(topology))
-
- # Then let's set the disk_per_container explicitly
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_CONTAINER_DISK_REQUESTED, 950109)
- # The add_topology_config will convert config into string
- self.assertEqual(str(950109), topology_helpers.get_disk_per_container(topology))
-
- def test_get_total_instances(self):
- topology = self.mock_proto.create_mock_medium_topology(3, 4, 5, 6)
-
- num_instances = topology_helpers.get_total_instances(topology)
- self.assertEqual(18, num_instances)
-
- def test_sane(self):
- # Make wrong topology names
- topology = self.mock_proto.create_mock_simple_topology()
- topology.name = ""
- self.assertFalse(topology_helpers.sane(topology))
-
- topology = self.mock_proto.create_mock_simple_topology()
- topology.name = "test.with.a.dot"
- self.assertFalse(topology_helpers.sane(topology))
-
- topology = self.mock_proto.create_mock_simple_topology()
- topology.name = "test/with/a/slash"
- self.assertFalse(topology_helpers.sane(topology))
-
- # Add another spout with the same name
- topology = self.mock_proto.create_mock_simple_topology()
- topology.spouts.extend([self.mock_proto.create_mock_spout("mock_spout", [], 1)])
- self.assertFalse(topology_helpers.sane(topology))
-
- # Add another bolt with the same name
- topology = self.mock_proto.create_mock_simple_topology()
- topology.bolts.extend([self.mock_proto.create_mock_bolt("mock_bolt", [], [], 1)])
- self.assertFalse(topology_helpers.sane(topology))
-
- # If num containers are greater than num instances
- topology = self.mock_proto.create_mock_simple_topology(1, 1)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- self.assertFalse(topology_helpers.sane(topology))
-
- # If rammap is partial with less componenets
- topology = self.mock_proto.create_mock_simple_topology()
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:1")
- self.assertTrue(topology_helpers.sane(topology))
-
- # If rammap is not well formatted
- topology = self.mock_proto.create_mock_simple_topology()
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:1:2,mock_bolt:2:3")
- self.assertFalse(topology_helpers.sane(topology))
-
- # If rammap has wrong component name
- topology = self.mock_proto.create_mock_simple_topology()
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "wrong_mock_spout:1,mock_bolt:2")
- self.assertFalse(topology_helpers.sane(topology))
-
- # If everything is right
- topology = self.mock_proto.create_mock_simple_topology()
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:1,mock_bolt:2")
- self.assertTrue(topology_helpers.sane(topology))
-
- def test_num_cpus_per_container(self):
- topology = self.mock_proto.create_mock_simple_topology(2, 2)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- self.assertEqual(2, topology_helpers.get_cpus_per_container(topology))
-
- topology = self.mock_proto.create_mock_simple_topology(2, 2)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_CPU_REQUESTED, 42)
- self.assertEqual(42, topology_helpers.get_cpus_per_container(topology))
-
- def test_get_user_rammap(self):
- topology = self.mock_proto.create_mock_simple_topology()
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2,mock_bolt:3")
- self.assertEqual({"mock_spout":2, "mock_bolt":3}, topology_helpers.get_user_rammap(topology))
-
- def test_get_component_distribution(self):
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- component_distribution = topology_helpers.get_component_distribution(topology)
-
- expected_component_distribution = {
- 1: [
- ("mock_bolt", "1", "0"),
- ("mock_bolt", "5", "4"),
- ("mock_spout", "9", "0")
- ],
- 2: [
- ("mock_bolt", "2", "1"),
- ("mock_bolt", "6", "5"),
- ("mock_spout", "10", "1")
- ],
- 3: [
- ("mock_bolt", "3", "2"),
- ("mock_bolt", "7", "6"),
- ("mock_spout", "11", "2")
- ],
- 4: [
- ("mock_bolt", "4", "3"),
- ("mock_bolt", "8", "7"),
- ("mock_spout", "12", "3")
- ]
- }
- self.assertEqual(expected_component_distribution, component_distribution)
-
- def test_get_component_rammap(self):
- # Mock a few methods
- # This is not a good way since this shows the internals
- # of the method. These methods need to be changed.
- # For example, ram_per_contaner could be taken as an argument.
- original_ram_for_stmgr = system_constants.RAM_FOR_STMGR
- system_constants.RAM_FOR_STMGR = 2
-
- # When rammap is specified, it should be used.
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2,mock_bolt:3")
- self.assertEqual(
- {"mock_spout":2, "mock_bolt":3}, topology_helpers.get_component_rammap(topology))
-
- # When partial rammap is specified, rest of the components should get default
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2")
- expected_component_rammap = {
- "mock_spout": 2,
- "mock_bolt": system_constants.DEFAULT_RAM_FOR_INSTANCE
- }
- self.assertEqual(expected_component_rammap, topology_helpers.get_component_rammap(topology))
-
- # When container ram is specified.
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED, 8)
-
- expected_component_rammap = {
- "mock_spout": 2,
- "mock_bolt": 2
- }
- component_rammap = topology_helpers.get_component_rammap(topology)
- self.assertEqual(expected_component_rammap, component_rammap)
-
- # When nothing is specified.
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- component_rammap = topology_helpers.get_component_rammap(topology)
- expected_component_rammap = {
- "mock_spout": system_constants.DEFAULT_RAM_FOR_INSTANCE,
- "mock_bolt": system_constants.DEFAULT_RAM_FOR_INSTANCE
- }
- self.assertEqual(expected_component_rammap, component_rammap)
-
- # Unmock the things that we mocked.
- system_constants.RAM_FOR_STMGR = original_ram_for_stmgr
-
- def test_get_ram_per_container(self):
- # Mock a few things
- original_ram_for_stmgr = system_constants.RAM_FOR_STMGR
- system_constants.RAM_FOR_STMGR = 2
- original_default_ram_for_instance = system_constants.DEFAULT_RAM_FOR_INSTANCE
- system_constants.DEFAULT_RAM_FOR_INSTANCE = 1
-
- # When rammap is specified
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2,mock_bolt:3")
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- self.assertEqual(10, topology_helpers.get_ram_per_container(topology))
-
- # When partial rammap is specified, rest of the components should get default
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_COMPONENT_RAMMAP, "mock_spout:2")
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- expected_ram_per_container = 6
- self.assertEqual(expected_ram_per_container, topology_helpers.get_ram_per_container(topology))
-
- # If container ram is specified
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- requested_ram = 15000
- self.mock_proto.add_topology_config(
- topology, api_constants.TOPOLOGY_CONTAINER_RAM_REQUESTED, str(requested_ram))
- # Difference should be less than the total instances
- self.assertLess(abs(topology_helpers.get_ram_per_container(topology) - requested_ram), 12)
-
- # When nothing is specified
- topology = self.mock_proto.create_mock_simple_topology(4, 8)
- self.mock_proto.add_topology_config(topology, api_constants.TOPOLOGY_STMGRS, 4)
- expected_ram_per_container = 5
- self.assertEqual(expected_ram_per_container, topology_helpers.get_ram_per_container(topology))
-
- # Unmock the things that we mocked.
- system_constants.RAM_FOR_STMGR = original_ram_for_stmgr
- system_constants.DEFAULT_RAM_FOR_INSTANCE = original_default_ram_for_instance
diff --git a/third_party/zookeeper/BUILD b/third_party/zookeeper/BUILD
index b0da55d..6d4c74c 100644
--- a/third_party/zookeeper/BUILD
+++ b/third_party/zookeeper/BUILD
@@ -3,7 +3,7 @@
package(default_visibility = ["//visibility:public"])
package_name = "zookeeper"
-package_version = "3.4.5-macfix"
+package_version = "3.4.10"
package_file = package_name + "-" + package_version + ".tar.gz"
package_dir = package_name + "-" + package_version
diff --git a/third_party/zookeeper/zookeeper-3.4.10.tar.gz b/third_party/zookeeper/zookeeper-3.4.10.tar.gz
new file mode 100644
index 0000000..1c2dcdb
--- /dev/null
+++ b/third_party/zookeeper/zookeeper-3.4.10.tar.gz
Binary files differ
diff --git a/third_party/zookeeper/zookeeper-3.4.5-macfix.tar.gz b/third_party/zookeeper/zookeeper-3.4.5-macfix.tar.gz
deleted file mode 100644
index 3842f5c..0000000
--- a/third_party/zookeeper/zookeeper-3.4.5-macfix.tar.gz
+++ /dev/null
Binary files differ
diff --git a/tools/rules/heron_deps.bzl b/tools/rules/heron_deps.bzl
index d02dcb5..4b1fbeb 100644
--- a/tools/rules/heron_deps.bzl
+++ b/tools/rules/heron_deps.bzl
@@ -16,3 +16,9 @@
"//heron/proto:proto_stmgr_java",
"@com_google_protobuf_protobuf_java//jar",
]
+
+def heron_java_api_proto_files():
+ return [
+ "//heron/proto:proto_topology_java",
+ "@com_google_protobuf_protobuf_java//jar",
+ ]