blob: 3b4dccf1629d41c41e7638f17567d1875e8ffa88 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''spout_instance.py: module for base spout for python topology'''
import queue
import time
import collections
from heronpy.api.stream import Stream
import heronpy.api.api_constants as api_constants
from heronpy.api.state.stateful_component import StatefulComponent
from heron.common.src.python.utils.log import Log
from heron.instance.src.python.utils.metrics import SpoutMetrics
from heron.instance.src.python.utils.tuple import TupleHelper
from heron.proto import topology_pb2, tuple_pb2, ckptmgr_pb2
import heron.instance.src.python.utils.system_constants as system_constants
from .base_instance import BaseInstance
# pylint: disable=too-many-instance-attributes
class SpoutInstance(BaseInstance):
"""The base class for all heron spouts in Python"""
def __init__(self, pplan_helper, in_stream, out_stream, looper):
super(SpoutInstance, self).__init__(pplan_helper, in_stream, out_stream, looper)
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
if not self.pplan_helper.is_spout:
raise RuntimeError("No spout in physicial plan")
context = self.pplan_helper.context
self.spout_metrics = SpoutMetrics(self.pplan_helper)
# acking related
mode = context.get_cluster_config().get(api_constants.TOPOLOGY_RELIABILITY_MODE,
api_constants.TopologyReliabilityMode.ATMOST_ONCE)
self.acking_enabled = bool(mode == api_constants.TopologyReliabilityMode.ATLEAST_ONCE)
self.enable_message_timeouts = \
context.get_cluster_config().get(api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)
self._initialized_metrics_and_tasks = False
Log.info("Enable ACK: %s" % str(self.acking_enabled))
Log.info("Enable Message Timeouts: %s" % str(self.enable_message_timeouts))
# map <tuple_info.key -> tuple_info>, ordered by insertion time
self.in_flight_tuples = collections.OrderedDict()
self.immediate_acks = collections.deque()
self.total_tuples_emitted = 0
# load user's spout class
spout_impl_class = super(SpoutInstance, self).load_py_instance(is_spout=True)
self.spout_impl = spout_impl_class(delegate=self)
def start_component(self, stateful_state):
context = self.pplan_helper.context
if not self._initialized_metrics_and_tasks:
self.spout_metrics.register_metrics(context)
if self.is_stateful and isinstance(self.spout_impl, StatefulComponent):
self.spout_impl.init_state(stateful_state)
self.spout_impl.initialize(config=context.get_cluster_config(), context=context)
if not self._initialized_metrics_and_tasks:
self._add_spout_task()
self._initialized_metrics_and_tasks = True
self.topology_state = topology_pb2.TopologyState.Value("RUNNING")
def stop_component(self):
self.spout_impl.close()
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
def invoke_activate(self):
Log.info("Spout is activated")
self.spout_impl.activate()
self.topology_state = topology_pb2.TopologyState.Value("RUNNING")
def invoke_deactivate(self):
Log.info("Spout is deactivated")
self.spout_impl.deactivate()
self.topology_state = topology_pb2.TopologyState.Value("PAUSED")
def emit(self, tup, tup_id=None, stream=Stream.DEFAULT_STREAM_ID,
direct_task=None, need_task_ids=False):
"""Emits a new tuple from this Spout
It is compatible with StreamParse API.
:type tup: list or tuple
:param tup: the new output Tuple to send from this spout,
should contain only serializable data.
:type tup_id: str or object
:param tup_id: the ID for the Tuple. Leave this blank for an unreliable emit.
(Same as messageId in Java)
:type stream: str
:param stream: the ID of the stream this Tuple should be emitted to.
Leave empty to emit to the default stream.
:type direct_task: int
:param direct_task: the task to send the Tuple to if performing a direct emit.
:type need_task_ids: bool
:param need_task_ids: indicate whether or not you would like the task IDs the Tuple was emitted.
"""
# first check whether this tuple is sane
self.pplan_helper.check_output_schema(stream, tup)
# get custom grouping target task ids; get empty list if not custom grouping
custom_target_task_ids = self.pplan_helper.choose_tasks_for_custom_grouping(stream, tup)
self.pplan_helper.context.invoke_hook_emit(tup, stream, None)
data_tuple = tuple_pb2.HeronDataTuple()
data_tuple.key = 0
if direct_task is not None:
if not isinstance(direct_task, int):
raise TypeError("direct_task argument needs to be an integer, given: %s"
% str(type(direct_task)))
# performing emit-direct
data_tuple.dest_task_ids.append(direct_task)
elif custom_target_task_ids is not None:
# for custom grouping
for task_id in custom_target_task_ids:
data_tuple.dest_task_ids.append(task_id)
if tup_id is not None:
tuple_info = TupleHelper.make_root_tuple_info(stream, tup_id)
if self.acking_enabled:
# this message is rooted
root = data_tuple.roots.add()
root.taskid = self.pplan_helper.my_task_id
root.key = tuple_info.key
self.in_flight_tuples[tuple_info.key] = tuple_info
else:
self.immediate_acks.append(tuple_info)
tuple_size_in_bytes = 0
start_time = time.time()
# Serialize
for obj in tup:
serialized = self.serializer.serialize(obj)
data_tuple.values.append(serialized)
tuple_size_in_bytes += len(serialized)
serialize_latency_ns = (time.time() - start_time) * system_constants.SEC_TO_NS
self.spout_metrics.serialize_data_tuple(stream, serialize_latency_ns)
super(SpoutInstance, self).admit_data_tuple(stream_id=stream, data_tuple=data_tuple,
tuple_size_in_bytes=tuple_size_in_bytes)
self.total_tuples_emitted += 1
self.spout_metrics.update_emit_count(stream)
if need_task_ids:
sent_task_ids = custom_target_task_ids or []
if direct_task is not None:
sent_task_ids.append(direct_task)
return sent_task_ids
# pylint: disable=no-self-use
def process_incoming_tuples(self):
self.looper.wake_up()
def _read_tuples(self):
start_cycle_time = time.time()
ack_batch_time = self.sys_config[system_constants.INSTANCE_ACK_BATCH_TIME_MS] * \
system_constants.MS_TO_SEC
while not self.in_stream.is_empty():
try:
tuples = self.in_stream.poll()
except queue.Empty:
break
if isinstance(tuples, tuple_pb2.HeronTupleSet):
if tuples.HasField("data"):
raise RuntimeError("Spout cannot get incoming data tuples from other components")
elif tuples.HasField("control"):
for ack_tuple in tuples.control.acks:
self._handle_ack_tuple(ack_tuple, True)
for fail_tuple in tuples.control.fails:
self._handle_ack_tuple(fail_tuple, False)
else:
Log.error("Received tuple neither data nor control")
elif isinstance(tuples, ckptmgr_pb2.InitiateStatefulCheckpoint):
self.handle_initiate_stateful_checkpoint(tuples, self.spout_impl)
else:
Log.error("Received tuple not instance of HeronTupleSet")
# avoid spending too much time here
if time.time() - start_cycle_time - ack_batch_time > 0:
break
def _produce_tuple(self):
# TOPOLOGY_MAX_SPOUT_PENDING must be provided (if not included, raise KeyError)
max_spout_pending = \
self.pplan_helper.context.get_cluster_config().get(api_constants.TOPOLOGY_MAX_SPOUT_PENDING)
total_tuples_emitted_before = self.total_tuples_emitted
total_data_emitted_bytes_before = self.get_total_data_emitted_in_bytes()
emit_batch_time = \
float(self.sys_config[system_constants.INSTANCE_EMIT_BATCH_TIME_MS]) * \
system_constants.MS_TO_SEC
emit_batch_size = int(self.sys_config[system_constants.INSTANCE_EMIT_BATCH_SIZE_BYTES])
start_cycle_time = time.time()
while (self.acking_enabled and max_spout_pending > len(self.in_flight_tuples)) or \
not self.acking_enabled:
start_time = time.time()
self.spout_impl.next_tuple()
next_tuple_latency_ns = (time.time() - start_time) * system_constants.SEC_TO_NS
self.spout_metrics.next_tuple(next_tuple_latency_ns)
if (self.total_tuples_emitted == total_tuples_emitted_before) or \
(time.time() - start_cycle_time - emit_batch_time > 0) or \
(self.get_total_data_emitted_in_bytes() - total_data_emitted_bytes_before >
emit_batch_size):
# no tuples to emit or batch reached
break
total_tuples_emitted_before = self.total_tuples_emitted
def _add_spout_task(self):
Log.info("Adding spout task...")
def spout_task():
# don't do anything when topology is paused
if not self._is_topology_running():
return
self._read_tuples()
if self._should_produce_tuple():
self._produce_tuple()
self.output_helper.send_out_tuples()
self.looper.wake_up() # so emitted tuples would be added to buffer now
else:
self.spout_metrics.update_out_queue_full_count()
if self.acking_enabled:
self.spout_metrics.update_pending_tuples_count(len(self.in_flight_tuples))
else:
self._do_immediate_acks()
if self._is_continue_to_work():
self.looper.wake_up()
self.looper.add_wakeup_task(spout_task)
# look for the timeout's tuples
if self.enable_message_timeouts:
self._look_for_timeouts()
def _is_topology_running(self):
return self.topology_state == topology_pb2.TopologyState.Value("RUNNING")
def _should_produce_tuple(self):
"""Checks whether we could produce tuples, i.e. invoke spout.next_tuple()"""
return self._is_topology_running() and self.output_helper.is_out_queue_available()
def _is_continue_to_work(self):
"""Checks whether we still need to do more work
When the topology state is RUNNING:
1. if the out_queue is not full and ack is not enabled, we could wake up next time to
produce more tuples and push to the out_queue
2. if the out_queue is not full but the acking is enabled, we need to make sure that
the number of pending tuples is smaller than max_spout_pending
3. if there are more to read, we will wake up itself next time.
"""
if not self._is_topology_running():
return False
max_spout_pending = \
self.pplan_helper.context.get_cluster_config().get(api_constants.TOPOLOGY_MAX_SPOUT_PENDING)
if not self.acking_enabled and self.output_helper.is_out_queue_available():
return True
elif self.acking_enabled and self.output_helper.is_out_queue_available() and \
len(self.in_flight_tuples) < max_spout_pending:
return True
elif self.acking_enabled and not self.in_stream.is_empty():
return True
else:
return False
def _look_for_timeouts(self):
spout_config = self.pplan_helper.context.get_cluster_config()
timeout_sec = spout_config.get(api_constants.TOPOLOGY_MESSAGE_TIMEOUT_SECS)
n_bucket = self.sys_config.get(system_constants.INSTANCE_ACKNOWLEDGEMENT_NBUCKETS)
now = time.time()
timeout_lst = []
while self.in_flight_tuples:
key = next(iter(self.in_flight_tuples))
tuple_info = self.in_flight_tuples[key]
if tuple_info.is_expired(now, timeout_sec):
timeout_lst.append(tuple_info)
self.in_flight_tuples.pop(key)
else:
# in_flight_tuples are ordered by insertion time
break
for tuple_info in timeout_lst:
self.spout_metrics.timeout_tuple(tuple_info.stream_id)
self._invoke_fail(tuple_info.tuple_id, tuple_info.stream_id,
timeout_sec * system_constants.SEC_TO_NS)
# register this method to timer again
self.looper.register_timer_task_in_sec(self._look_for_timeouts, float(timeout_sec) / n_bucket)
# ACK/FAIL related
def _handle_ack_tuple(self, tup, is_success):
for rt in tup.roots:
if rt.taskid != self.pplan_helper.my_task_id:
raise RuntimeError("Receiving tuple for task: %s in task: %s"
% (str(rt.taskid), str(self.pplan_helper.my_task_id)))
try:
tuple_info = self.in_flight_tuples.pop(rt.key)
except KeyError:
# rt.key is not in in_flight_tuples -> already removed due to time-out
return
# pylint: disable=no-member
if tuple_info.tuple_id is not None:
latency_ns = (time.time() - tuple_info.insertion_time) * system_constants.SEC_TO_NS
if is_success:
self._invoke_ack(tuple_info.tuple_id, tuple_info.stream_id, latency_ns)
else:
self._invoke_fail(tuple_info.tuple_id, tuple_info.stream_id, latency_ns)
def _do_immediate_acks(self):
size = len(self.immediate_acks)
for _ in range(size):
tuple_info = self.immediate_acks.pop()
self._invoke_ack(tuple_info.tuple_id, tuple_info.stream_id, 0)
def _invoke_ack(self, tuple_id, stream_id, complete_latency_ns):
Log.debug("In invoke_ack(): Acking %s from stream: %s" % (str(tuple_id), stream_id))
self.spout_impl.ack(tuple_id)
self.pplan_helper.context.invoke_hook_spout_ack(tuple_id, complete_latency_ns)
self.spout_metrics.acked_tuple(stream_id, complete_latency_ns)
def _invoke_fail(self, tuple_id, stream_id, fail_latency_ns):
Log.debug("In invoke_fail(): Failing %s from stream: %s" % (str(tuple_id), stream_id))
self.spout_impl.fail(tuple_id)
self.pplan_helper.context.invoke_hook_spout_fail(tuple_id, fail_latency_ns)
self.spout_metrics.failed_tuple(stream_id, fail_latency_ns)