import socket
from heronpy.api.custom_grouping import ICustomGrouping
from heronpy.api.serializer import default_serializer
from heron.proto import topology_pb2
from heron.common.src.python.utils.log import Log
import heron.common.src.python.pex_loader as pex_loader
from heron.instance.src.python.utils.topology import TopologyContextImpl
from .custom_grouping_helper import CustomGroupingHelper
# pylint: disable=too-many-instance-attributes
class PhysicalPlanHelper(object):
"""Helper class for accessing Physical Plan
:ivar pplan: Physical Plan protobuf message
:ivar topology_pex_abs_path: Topology pex file's absolute path
:ivar my_instance_id: instance id for this instance
:ivar my_instance: Instance protobuf message for this instance
:ivar my_component_name: component name for this instance
:ivar my_task_id: global task id for this instance
:ivar is_spout: ``True`` if it's spout, ``False`` if it's bolt
:ivar hostname: hostname of this instance
:ivar my_component: Component protobuf message for this instance
:ivar context: Topology context if set, otherwise ``None``
def __init__(self, pplan, instance_id, topology_pex_abs_path):
self.pplan = pplan
self.my_instance_id = instance_id
self.my_instance = None
self.topology_pex_abs_path = topology_pex_abs_path
# get my instance
for instance in pplan.instances:
if instance.instance_id == self.my_instance_id:
self.my_instance = instance
if self.my_instance is None:
raise RuntimeError("There was no instance that matched my id: %s" % self.my_instance_id)
self.my_component_name =
self.my_task_id =
# get spout or bolt
self._my_spbl, self.is_spout = self._get_my_spout_or_bolt(pplan.topology)
# Map <stream id -> number of fields in that stream's schema>
self._output_schema = dict()
outputs = self._my_spbl.outputs
# setup output schema
for out_stream in outputs:
self._output_schema[] = len(out_stream.schema.keys)
self.hostname = socket.gethostname()
self.my_component = self._my_spbl.comp
self.context = None
# setup for custom grouping
self.custom_grouper = CustomGroupingHelper()
def _get_my_spout_or_bolt(self, topology):
my_spbl = None
for spbl in list(topology.spouts) + list(topology.bolts):
if == self.my_component_name:
if my_spbl is not None:
raise RuntimeError("Duplicate my component found")
my_spbl = spbl
if isinstance(my_spbl, topology_pb2.Spout):
is_spout = True
elif isinstance(my_spbl, topology_pb2.Bolt):
is_spout = False
elif my_spbl.__class__.__name__ == "Spout":"Mismatch between cpp and python protobuf")
is_spout = True
elif my_spbl.__class__.__name__ == "Bolt":"Mismatch between cpp and python protobuf")
is_spout = False
raise RuntimeError("My component neither spout nor bolt")
return my_spbl, is_spout
def check_output_schema(self, stream_id, tup):
"""Checks if a given stream_id and tuple matches with the output schema
:type stream_id: str
:param stream_id: stream id into which tuple is sent
:type tup: list
:param tup: tuple that is going to be sent
# do some checking to make sure that the number of fields match what's expected
size = self._output_schema.get(stream_id, None)
if size is None:
raise RuntimeError("%s emitting to stream %s but was not declared in output fields"
% (self.my_component_name, stream_id))
elif size != len(tup):
raise RuntimeError("Number of fields emitted in stream %s does not match what's expected. "
"Expected: %s, Observed: %s" % (stream_id, size, len(tup)))
def get_my_spout(self):
"""Returns spout instance, or ``None`` if bolt is assigned"""
if self.is_spout:
return self._my_spbl
return None
def get_my_bolt(self):
"""Returns bolt instance, or ``None`` if spout is assigned"""
if self.is_spout:
return None
return self._my_spbl
def get_topology_state(self):
"""Returns the current topology state"""
return self.pplan.topology.state
def is_topology_running(self):
"""Checks whether topology is currently running"""
return self.pplan.topology.state == topology_pb2.TopologyState.Value("RUNNING")
def is_topology_paused(self):
"""Checks whether topology is currently paused"""
return self.pplan.topology.state == topology_pb2.TopologyState.Value("PAUSED")
def is_topology_killed(self):
"""Checks whether topology is already killed"""
return self.pplan.topology.state == topology_pb2.TopologyState.Value("KILLED")
def get_topology_config(self):
"""Returns the topology config"""
if self.pplan.topology.HasField("topology_config"):
return self._get_dict_from_config(self.pplan.topology.topology_config)
return {}
def set_topology_context(self, metrics_collector):
"""Sets a new topology context"""
Log.debug("Setting topology context")
cluster_config = self.get_topology_config()
task_to_component_map = self._get_task_to_comp_map()
self.context = TopologyContextImpl(cluster_config, self.pplan.topology, task_to_component_map,
self.my_task_id, metrics_collector,
def _get_dict_from_config(topology_config):
"""Converts Config protobuf message to python dictionary
Values are converted according to the rules below:
- Number string (e.g. "12" or "1.2") is appropriately converted to ``int`` or ``float``
- Boolean string ("true", "True", "false" or "False") is converted to built-in boolean type
(i.e. ``True`` or ``False``)
- Normal string is inserted to dict as is
- Serialized value is deserialized and inserted as a corresponding Python object
config = {}
for kv in topology_config.kvs:
if kv.HasField("value"):
assert kv.type == topology_pb2.ConfigValueType.Value("STRING_VALUE")
# value is string
if PhysicalPlanHelper._is_number(kv.value):
config[kv.key] = PhysicalPlanHelper._get_number(kv.value)
elif kv.value.lower() in ("true", "false"):
config[kv.key] = True if kv.value.lower() == "true" else False
config[kv.key] = kv.value
elif kv.HasField("serialized_value") and \
kv.type == topology_pb2.ConfigValueType.Value("PYTHON_SERIALIZED_VALUE"):
# deserialize that
config[kv.key] = default_serializer.deserialize(kv.serialized_value)
assert kv.HasField("type")
Log.error("Unsupported config <key:value> found: %s, with type: %s"
% (str(kv), str(kv.type)))
return config
def _is_number(string):
return True
except ValueError:
return False
def _get_number(string):
return int(string)
except ValueError:
return float(string)
def _get_task_to_comp_map(self):
ret = {}
for instance in self.pplan.instances:
ret[] =
return ret
##### custom grouping related #####
def _setup_custom_grouping(self, topology):
"""Checks whether there are any bolts that consume any of my streams using custom grouping"""
for i in range(len(topology.bolts)):
for in_stream in topology.bolts[i].inputs:
if == self.my_component_name and \
in_stream.gtype == topology_pb2.Grouping.Value("CUSTOM"):
# this bolt takes my output in custom grouping manner
if in_stream.type == topology_pb2.CustomGroupingObjectType.Value("PYTHON_OBJECT"):
custom_grouping_obj = default_serializer.deserialize(in_stream.custom_grouping_object)
if isinstance(custom_grouping_obj, str):
grouping_cls = \
pex_loader.import_and_get_class(self.topology_pex_abs_path, custom_grouping_obj)
custom_grouping_obj = grouping_cls()
assert isinstance(custom_grouping_obj, ICustomGrouping)
elif in_stream.type == topology_pb2.CustomGroupingObjectType.Value("JAVA_OBJECT"):
raise NotImplementedError("Java-serialized custom grouping is not yet supported "
"for python topology")
raise ValueError("Unrecognized custom grouping type found: %s" % str(in_stream.type))
def _get_taskids_for_component(self, component_name):
return [ for instance in self.pplan.instances
if == component_name]
def prepare_custom_grouping(self, context):
"""Prepares for custom grouping for this component
:param context: Topology context
def choose_tasks_for_custom_grouping(self, stream_id, values):
"""Choose target task ids for custom grouping
:return: task ids
return self.custom_grouper.choose_tasks(stream_id, values)