blob: deddfd01813ca05f3e567516f0a792f4bd1a1568 [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''Stream Manager client for single-thread heron instance in python'''
import sys
import traceback
from heron.common.src.python.utils.log import Log
from heron.instance.src.python.network.heron_client import HeronClient
from heron.instance.src.python.network import StatusCode
from heron.instance.src.python.utils import system_config
from heron.proto import common_pb2, stmgr_pb2, tuple_pb2, ckptmgr_pb2
import heron.instance.src.python.utils.system_constants as constants
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
class SingleThreadStmgrClient(HeronClient):
"""SingleThreadStmgrClient is a Heron Client that communicates with Stream Manager
This class is intended to be used with SingleThreadHeronInstance.
It will:
1. Register the message of NewInstanceAssignmentMessage and HeronTupleSet2
2. Send a register request when on_connect() is called
3. Handle relative response for requests
"""
def __init__(self, looper, heron_instance_cls, strmgr_host, port, topology_name, topology_id,
instance, sock_map, gateway_metrics, socket_options):
HeronClient.__init__(self, looper, strmgr_host, port, sock_map, socket_options)
self.heron_instance_cls = heron_instance_cls
self.topology_name = topology_name
self.topology_id = topology_id
# physical_plan_pb2.Instance message
self.instance = instance
self.gateway_metrics = gateway_metrics
self.sys_config = system_config.get_sys_config()
self.is_registered = False
# send register request
def on_connect(self, status):
Log.debug("In on_connect of STStmgrClient")
if status != StatusCode.OK:
Log.error("Error connecting to Stream Manager with status: %s", str(status))
retry_interval = float(self.sys_config[constants.INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC])
self.looper.register_timer_task_in_sec(self.start_connect, retry_interval)
self.is_registered = False
return
self._register_msg_to_handle()
self._send_register_req()
def on_response(self, status, context, response):
Log.debug("In on_response with status: %s, with context: %s", str(status), str(context))
if status != StatusCode.OK:
raise RuntimeError("Response from Stream Manager not OK")
if isinstance(response, stmgr_pb2.RegisterInstanceResponse):
self._handle_register_response(response)
else:
Log.error("Unknown kind of response received: %s", response.DESCRIPTOR.full_name)
raise RuntimeError("Unknown kind of response received from Stream Manager")
def on_incoming_message(self, message):
self.gateway_metrics.update_received_packet(message.ByteSize())
try:
if isinstance(message, stmgr_pb2.NewInstanceAssignmentMessage):
Log.info("Handling assignment message from direct NewInstanceAssignmentMessage")
self._handle_assignment_message(message.pplan)
elif isinstance(message, tuple_pb2.HeronTupleSet2):
self._handle_new_tuples_2(message)
elif isinstance(message, ckptmgr_pb2.StartInstanceStatefulProcessing):
self._handle_start_stateful_processing(message)
elif isinstance(message, ckptmgr_pb2.RestoreInstanceStateRequest):
self._handle_restore_instance_state(message)
elif isinstance(message, ckptmgr_pb2.InitiateStatefulCheckpoint):
self._handle_initiate_stateful_checkpoint(message)
else:
raise RuntimeError("Unknown kind of message received from Stream Manager")
except Exception as e:
Log.error("Error happened while handling a message from stmgr: " + str(e))
Log.error(traceback.format_exc())
sys.exit(1)
def on_error(self):
Log.error("Disconnected from Stream Manager")
# retry again
self.on_connect(StatusCode.CONNECT_ERROR)
def _register_msg_to_handle(self):
# pylint: disable=unnecessary-lambda
new_instance_builder = lambda: stmgr_pb2.NewInstanceAssignmentMessage()
hts2_msg_builder = lambda: tuple_pb2.HeronTupleSet2()
stateful_start_msg_builder = lambda: ckptmgr_pb2.StartInstanceStatefulProcessing()
stateful_restore_msg_builder = lambda: ckptmgr_pb2.RestoreInstanceStateRequest()
stateful_initiate_msg_builder = lambda: ckptmgr_pb2.InitiateStatefulCheckpoint()
self.register_on_message(new_instance_builder)
self.register_on_message(hts2_msg_builder)
self.register_on_message(stateful_start_msg_builder)
self.register_on_message(stateful_restore_msg_builder)
self.register_on_message(stateful_initiate_msg_builder)
def _send_register_req(self):
request = stmgr_pb2.RegisterInstanceRequest()
request.instance.MergeFrom(self.instance)
request.topology_name = self.topology_name
request.topology_id = self.topology_id
timeout_sec = float(self.sys_config[constants.INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC])
self.send_request(request, "Context", stmgr_pb2.RegisterInstanceResponse(), timeout_sec)
def _handle_register_response(self, response):
"""Called when a register response (RegisterInstanceResponse) arrives"""
if response.status.status != common_pb2.StatusCode.Value("OK"):
raise RuntimeError("Stream Manager returned a not OK response for register")
Log.info("We registered ourselves to the Stream Manager")
self.is_registered = True
if response.HasField("pplan"):
Log.info("Handling assignment message from response")
self._handle_assignment_message(response.pplan)
else:
Log.debug("Received a register response with no pplan")
def _handle_new_tuples_2(self, hts2):
"""Called when new HeronTupleSet2 arrives
"""
self.heron_instance_cls.handle_new_tuple_set_2(hts2)
def _handle_initiate_stateful_checkpoint(self, ckptmsg):
"""Called when new InitiateStatefulCheckpoint arrives
"""
self.heron_instance_cls.handle_initiate_stateful_checkpoint(ckptmsg)
def _handle_start_stateful_processing(self, startmsg):
"""Called when new StartInstanceStatefulProcessing arrives
"""
self.heron_instance_cls.handle_start_stateful_processing(startmsg)
def _handle_restore_instance_state(self, restoremsg):
"""Called when new RestoreInstanceStateRequest arrives
"""
self.heron_instance_cls.handle_restore_instance_state(restoremsg)
def _handle_assignment_message(self, pplan):
"""Called when new NewInstanceAssignmentMessage arrives"""
Log.debug("In handle_assignment_message() of STStmgrClient, Physical Plan: \n%s", str(pplan))
self.heron_instance_cls.handle_assignment_msg(pplan)