blob: 95687a0c016fa250132bf36e2468f5ece0d8d9d2 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''topology_context.py'''
import os
from collections import namedtuple
from heronpy.api.task_hook import (ITaskHook, EmitInfo, SpoutAckInfo,
SpoutFailInfo, BoltExecuteInfo,
BoltAckInfo, BoltFailInfo)
from heronpy.api.topology_context import TopologyContext
import heronpy.api.api_constants as api_constants
from heron.instance.src.python.utils.metrics import MetricsCollector
import heron.instance.src.python.utils.system_constants as system_constants
import heron.common.src.python.pex_loader as pex_loader
class TopologyContextImpl(TopologyContext):
"""Implemention of TopologyContext
This is created by Heron Instance and passed on to the topology spouts/bolts
as the topology context
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config, topology, task_to_component, my_task_id, metrics_collector,
topo_pex_path):
self.config = config
self.topology = topology
self.task_to_component_map = task_to_component
self.task_id = my_task_id
self.metrics_collector = metrics_collector
self.topology_pex_path = os.path.abspath(topo_pex_path)
inputs, outputs, out_fields = self._get_inputs_and_outputs_and_outfields(topology)
self.inputs = inputs
self.outputs = outputs
self.component_to_out_fields = out_fields
# init task hooks
self.task_hooks = []
self._init_task_hooks()
##### Implementation of interface methods #####
def get_task_id(self):
"""Property to get the task id of this component"""
return self.task_id
def get_component_id(self):
"""Property to get the component id of this component"""
return self.task_to_component_map.get(self.get_task_id())
def get_cluster_config(self):
"""Returns the cluster config for this component
Note that the returned config is auto-typed map: <str -> any Python object>.
"""
return self.config
def get_topology_name(self):
"""Returns the name of the topology
"""
return str(self.topology.name)
def register_metric(self, name, metric, time_bucket_in_sec):
"""Registers a new metric to this context"""
collector = self.get_metrics_collector()
collector.register_metric(name, metric, time_bucket_in_sec)
def get_sources(self, component_id):
"""Returns the declared inputs to specified component
:return: map <streamId namedtuple (same structure as protobuf msg) -> gtype>, or
None if not found
"""
# this is necessary because protobuf message is not hashable
StreamId = namedtuple('StreamId', 'id, component_name')
if component_id in self.inputs:
ret = {}
for istream in self.inputs.get(component_id):
key = StreamId(id=istream.stream.id, component_name=istream.stream.component_name)
ret[key] = istream.gtype
return ret
else:
return None
def get_this_sources(self):
return self.get_sources(self.get_component_id())
def get_component_tasks(self, component_id):
"""Returns the task ids allocated for the given component id"""
ret = []
for task_id, comp_id in list(self.task_to_component_map.items()):
if comp_id == component_id:
ret.append(task_id)
return ret
def add_task_hook(self, task_hook):
"""Registers a specified task hook to this context
:type task_hook: heron.instance.src.python.utils.topology.ITaskHook
:param task_hook: Implementation of ITaskHook
"""
if not isinstance(task_hook, ITaskHook):
raise TypeError("In add_task_hook(): attempt to add non ITaskHook instance, given: %s"
% str(type(task_hook)))
self.task_hooks.append(task_hook)
##### Other exposed implementation specific methods #####
def get_topology_pex_path(self):
"""Returns the topology's pex file path"""
return self.topology_pex_path
def get_metrics_collector(self):
"""Returns this context's metrics collector"""
if self.metrics_collector is None or not isinstance(self.metrics_collector, MetricsCollector):
raise RuntimeError("Metrics collector is not registered in this context")
return self.metrics_collector
########################################
@classmethod
def _get_inputs_and_outputs_and_outfields(cls, topology):
inputs = {}
outputs = {}
out_fields = {}
for spout in topology.spouts:
inputs[spout.comp.name] = [] # spout doesn't have any inputs
outputs[spout.comp.name] = spout.outputs
out_fields.update(cls._get_output_to_comp_fields(spout.outputs))
for bolt in topology.bolts:
inputs[bolt.comp.name] = bolt.inputs
outputs[bolt.comp.name] = bolt.outputs
out_fields.update(cls._get_output_to_comp_fields(bolt.outputs))
return inputs, outputs, out_fields
@staticmethod
def _get_output_to_comp_fields(outputs):
out_fields = {}
for out_stream in outputs:
comp_name = out_stream.stream.component_name
stream_id = out_stream.stream.id
if comp_name not in out_fields:
out_fields[comp_name] = dict()
# get the fields of a particular output stream
ret = []
for kt in out_stream.schema.keys:
ret.append(kt.key)
out_fields[comp_name][stream_id] = tuple(ret)
return out_fields
######### Task hook related ##########
def _init_task_hooks(self):
task_hooks_cls_list = self.get_cluster_config().get(api_constants.TOPOLOGY_AUTO_TASK_HOOKS,
None)
if task_hooks_cls_list is None:
return
# load pex first
topo_pex_path = self.get_topology_pex_path()
pex_loader.load_pex(topo_pex_path)
for class_name in task_hooks_cls_list:
try:
task_hook_cls = pex_loader.import_and_get_class(topo_pex_path, class_name)
task_hook_instance = task_hook_cls()
assert isinstance(task_hook_instance, ITaskHook)
self.task_hooks.append(task_hook_instance)
except AssertionError:
raise RuntimeError("Auto-registered task hook not instance of ITaskHook")
except Exception as e:
raise RuntimeError("Error with loading task hook class: %s, with error message: %s"
% (class_name, str(e)))
def invoke_hook_prepare(self):
"""invoke task hooks for after the spout/bolt's initialize() method"""
for task_hook in self.task_hooks:
task_hook.prepare(self.get_cluster_config(), self)
def invoke_hook_cleanup(self):
"""invoke task hooks for just before the spout/bolt's cleanup method"""
for task_hook in self.task_hooks:
task_hook.clean_up()
def invoke_hook_emit(self, values, stream_id, out_tasks):
"""invoke task hooks for every time a tuple is emitted in spout/bolt
:type values: list
:param values: values emitted
:type stream_id: str
:param stream_id: stream id into which tuple is emitted
:type out_tasks: list
:param out_tasks: list of custom grouping target task id
"""
if len(self.task_hooks) > 0:
emit_info = EmitInfo(values=values, stream_id=stream_id,
task_id=self.get_task_id(), out_tasks=out_tasks)
for task_hook in self.task_hooks:
task_hook.emit(emit_info)
def invoke_hook_spout_ack(self, message_id, complete_latency_ns):
"""invoke task hooks for every time spout acks a tuple
:type message_id: str
:param message_id: message id to which an acked tuple was anchored
:type complete_latency_ns: float
:param complete_latency_ns: complete latency in nano seconds
"""
if len(self.task_hooks) > 0:
spout_ack_info = SpoutAckInfo(message_id=message_id,
spout_task_id=self.get_task_id(),
complete_latency_ms=complete_latency_ns *
system_constants.NS_TO_MS)
for task_hook in self.task_hooks:
task_hook.spout_ack(spout_ack_info)
def invoke_hook_spout_fail(self, message_id, fail_latency_ns):
"""invoke task hooks for every time spout fails a tuple
:type message_id: str
:param message_id: message id to which a failed tuple was anchored
:type fail_latency_ns: float
:param fail_latency_ns: fail latency in nano seconds
"""
if len(self.task_hooks) > 0:
spout_fail_info = SpoutFailInfo(message_id=message_id,
spout_task_id=self.get_task_id(),
fail_latency_ms=fail_latency_ns * system_constants.NS_TO_MS)
for task_hook in self.task_hooks:
task_hook.spout_fail(spout_fail_info)
def invoke_hook_bolt_execute(self, heron_tuple, execute_latency_ns):
"""invoke task hooks for every time bolt processes a tuple
:type heron_tuple: HeronTuple
:param heron_tuple: tuple that is executed
:type execute_latency_ns: float
:param execute_latency_ns: execute latency in nano seconds
"""
if len(self.task_hooks) > 0:
bolt_execute_info = \
BoltExecuteInfo(heron_tuple=heron_tuple,
executing_task_id=self.get_task_id(),
execute_latency_ms=execute_latency_ns * system_constants.NS_TO_MS)
for task_hook in self.task_hooks:
task_hook.bolt_execute(bolt_execute_info)
def invoke_hook_bolt_ack(self, heron_tuple, process_latency_ns):
"""invoke task hooks for every time bolt acks a tuple
:type heron_tuple: HeronTuple
:param heron_tuple: tuple that is acked
:type process_latency_ns: float
:param process_latency_ns: process latency in nano seconds
"""
if len(self.task_hooks) > 0:
bolt_ack_info = BoltAckInfo(heron_tuple=heron_tuple,
acking_task_id=self.get_task_id(),
process_latency_ms=process_latency_ns * system_constants.NS_TO_MS)
for task_hook in self.task_hooks:
task_hook.bolt_ack(bolt_ack_info)
def invoke_hook_bolt_fail(self, heron_tuple, fail_latency_ns):
"""invoke task hooks for every time bolt fails a tuple
:type heron_tuple: HeronTuple
:param heron_tuple: tuple that is failed
:type fail_latency_ns: float
:param fail_latency_ns: fail latency in nano seconds
"""
if len(self.task_hooks) > 0:
bolt_fail_info = BoltFailInfo(heron_tuple=heron_tuple,
failing_task_id=self.get_task_id(),
fail_latency_ms=fail_latency_ns * system_constants.NS_TO_MS)
for task_hook in self.task_hooks:
task_hook.bolt_fail(bolt_fail_info)