blob: 77bfc379d5e73888c34fc33369f7b1b394424069 [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''outgoing_tuple_helper.py: module to provide a helper class for preparing and pushing tuples'''
import sys
from heron.common.src.python.utils.log import Log
from heron.proto import tuple_pb2, topology_pb2, ckptmgr_pb2
import heron.instance.src.python.utils.system_constants as constants
from heron.instance.src.python.utils import system_config
# pylint: disable=too-many-instance-attributes
# pylint: disable=no-value-for-parameter
class OutgoingTupleHelper(object):
"""Helper class for preparing and pushing tuples to Out-Stream
This is a Python implementation of OutgoingTupleCollection.java
Handles basic methods for sending out tuples
1. ``init_new_control_tuple()`` or ``init_new_data_tuple()``
2. ``add_data_tuple()``, ``add_ack_tuple()`` and ``add_fail_tuple()``
3. ``flush_remaining()`` tuples and send out the tuples
:ivar out_stream: (HeronCommunicator) Out-Stream. Pushed message is an instance of HeronTupleSet
:ivar pplan_helper: (PhysicalPlanHelper) Physical Plan Helper for this component
:ivar current_data_tuple_set: (HeronDataTupleSet) currently buffered data tuple
:ivar current_control_tuple_set: (HeronControlTupleSet) currently buffered control tuple
"""
make_data_tuple_set = lambda _: tuple_pb2.HeronDataTupleSet()
make_control_tuple_set = lambda _: tuple_pb2.HeronControlTupleSet()
make_tuple_set = lambda _: tuple_pb2.HeronTupleSet()
make_stream_id = lambda _: topology_pb2.StreamId()
def __init__(self, pplan_helper, out_stream):
self.out_stream = out_stream
self.pplan_helper = pplan_helper
self.current_data_tuple_set = None
self.current_control_tuple_set = None
self.current_data_tuple_size_in_bytes = 0
self.total_data_emitted_in_bytes = 0
self.sys_config = system_config.get_sys_config()
# read the config values
self.data_tuple_set_capacity = self.sys_config[constants.INSTANCE_SET_DATA_TUPLE_CAPACITY]
self.max_data_tuple_size_in_bytes =\
self.sys_config.get(constants.INSTANCE_SET_DATA_TUPLE_SIZE_BYTES, sys.maxsize)
self.control_tuple_set_capacity = self.sys_config[constants.INSTANCE_SET_CONTROL_TUPLE_CAPACITY]
def send_out_tuples(self):
"""Sends out currently buffered tuples into the Out-Stream"""
self._flush_remaining()
def add_data_tuple(self, stream_id, new_data_tuple, tuple_size_in_bytes):
"""Add a new data tuple to the currently buffered set of tuples"""
if (self.current_data_tuple_set is None) or \
(self.current_data_tuple_set.stream.id != stream_id) or \
(len(self.current_data_tuple_set.tuples) >= self.data_tuple_set_capacity) or \
(self.current_data_tuple_size_in_bytes >= self.max_data_tuple_size_in_bytes):
self._init_new_data_tuple(stream_id)
added_tuple = self.current_data_tuple_set.tuples.add()
added_tuple.CopyFrom(new_data_tuple)
self.current_data_tuple_size_in_bytes += tuple_size_in_bytes
self.total_data_emitted_in_bytes += tuple_size_in_bytes
def add_control_tuple(self, new_control_tuple, tuple_size_in_bytes, is_ack):
"""Add a new control (Ack/Fail) tuple to the currently buffered set of tuples
:param is_ack: ``True`` if Ack, ``False`` if Fail
"""
if self.current_control_tuple_set is None:
self._init_new_control_tuple()
elif is_ack and (len(self.current_control_tuple_set.fails) > 0 or
len(self.current_control_tuple_set.acks) >= self.control_tuple_set_capacity):
self._init_new_control_tuple()
elif not is_ack and \
(len(self.current_control_tuple_set.acks) > 0 or
len(self.current_control_tuple_set.fails) >= self.control_tuple_set_capacity):
self._init_new_control_tuple()
if is_ack:
added_tuple = self.current_control_tuple_set.acks.add()
else:
added_tuple = self.current_control_tuple_set.fails.add()
added_tuple.CopyFrom(new_control_tuple)
self.total_data_emitted_in_bytes += tuple_size_in_bytes
def add_ckpt_state(self, ckpt_id, ckpt_state):
"""Add the checkpoint state message to be sent back the stmgr
:param ckpt_id: The id of the checkpoint
:ckpt_state: The checkpoint state
"""
# first flush any buffered tuples
self._flush_remaining()
msg = ckptmgr_pb2.StoreInstanceStateCheckpoint()
istate = ckptmgr_pb2.InstanceStateCheckpoint()
istate.checkpoint_id = ckpt_id
istate.state = ckpt_state
msg.state.CopyFrom(istate)
self._push_tuple_to_stream(msg)
def clear(self):
"""Clears current data and the streams"""
self._flush_remaining()
self.out_stream.clear()
def _init_new_data_tuple(self, stream_id):
self._flush_remaining()
self.current_data_tuple_size_in_bytes = 0
new_stream_id = self.make_stream_id()
new_stream_id.id = stream_id
new_stream_id.component_name = self.pplan_helper.my_component_name
new_data_tuple_set = self.make_data_tuple_set()
new_data_tuple_set.stream.CopyFrom(new_stream_id)
self.current_data_tuple_set = new_data_tuple_set
def _init_new_control_tuple(self):
self._flush_remaining()
self.current_control_tuple_set = self.make_control_tuple_set()
def _flush_remaining(self):
if self.current_data_tuple_set is not None:
Log.debug("In flush_remaining() - flush data tuple set")
tuple_set = self.make_tuple_set()
tuple_set.data.CopyFrom(self.current_data_tuple_set)
self._push_tuple_to_stream(tuple_set)
self.current_data_tuple_set = None
self.current_data_tuple_size_in_bytes = 0
if self.current_control_tuple_set is not None:
Log.debug("In flush_remaining() - flush control tuple set")
tuple_set = self.make_tuple_set()
tuple_set.control.CopyFrom(self.current_control_tuple_set)
self._push_tuple_to_stream(tuple_set)
self.current_control_tuple_set = None
def _push_tuple_to_stream(self, tuple_set):
self.out_stream.offer(tuple_set)
def is_out_queue_available(self):
return self.out_stream.get_available_capacity() > 0