| #!/usr/bin/env python3 |
| # -*- encoding: utf-8 -*- |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| '''Stream Manager client for single-thread heron instance in python''' |
| import sys |
| import traceback |
| |
| from heron.common.src.python.utils.log import Log |
| |
| from heron.instance.src.python.network.heron_client import HeronClient |
| from heron.instance.src.python.network import StatusCode |
| from heron.instance.src.python.utils import system_config |
| |
| from heron.proto import common_pb2, stmgr_pb2, tuple_pb2, ckptmgr_pb2 |
| |
| import heron.instance.src.python.utils.system_constants as constants |
| |
| # pylint: disable=too-many-arguments |
| # pylint: disable=too-many-instance-attributes |
| class SingleThreadStmgrClient(HeronClient): |
| """SingleThreadStmgrClient is a Heron Client that communicates with Stream Manager |
| |
| This class is intended to be used with SingleThreadHeronInstance. |
| |
| It will: |
| 1. Register the message of NewInstanceAssignmentMessage and HeronTupleSet2 |
| 2. Send a register request when on_connect() is called |
| 3. Handle relative response for requests |
| """ |
| def __init__(self, looper, heron_instance_cls, strmgr_host, port, topology_name, topology_id, |
| instance, sock_map, gateway_metrics, socket_options): |
| HeronClient.__init__(self, looper, strmgr_host, port, sock_map, socket_options) |
| self.heron_instance_cls = heron_instance_cls |
| self.topology_name = topology_name |
| self.topology_id = topology_id |
| # physical_plan_pb2.Instance message |
| self.instance = instance |
| self.gateway_metrics = gateway_metrics |
| self.sys_config = system_config.get_sys_config() |
| self.is_registered = False |
| |
| # send register request |
| def on_connect(self, status): |
| Log.debug("In on_connect of STStmgrClient") |
| if status != StatusCode.OK: |
| Log.error("Error connecting to Stream Manager with status: %s", str(status)) |
| retry_interval = float(self.sys_config[constants.INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC]) |
| self.looper.register_timer_task_in_sec(self.start_connect, retry_interval) |
| self.is_registered = False |
| return |
| self._register_msg_to_handle() |
| self._send_register_req() |
| |
| def on_response(self, status, context, response): |
| Log.debug("In on_response with status: %s, with context: %s", str(status), str(context)) |
| if status != StatusCode.OK: |
| raise RuntimeError("Response from Stream Manager not OK") |
| if isinstance(response, stmgr_pb2.RegisterInstanceResponse): |
| self._handle_register_response(response) |
| else: |
| Log.error("Unknown kind of response received: %s", response.DESCRIPTOR.full_name) |
| raise RuntimeError("Unknown kind of response received from Stream Manager") |
| |
| def on_incoming_message(self, message): |
| self.gateway_metrics.update_received_packet(message.ByteSize()) |
| try: |
| if isinstance(message, stmgr_pb2.NewInstanceAssignmentMessage): |
| Log.info("Handling assignment message from direct NewInstanceAssignmentMessage") |
| self._handle_assignment_message(message.pplan) |
| elif isinstance(message, tuple_pb2.HeronTupleSet2): |
| self._handle_new_tuples_2(message) |
| elif isinstance(message, ckptmgr_pb2.StartInstanceStatefulProcessing): |
| self._handle_start_stateful_processing(message) |
| elif isinstance(message, ckptmgr_pb2.RestoreInstanceStateRequest): |
| self._handle_restore_instance_state(message) |
| elif isinstance(message, ckptmgr_pb2.InitiateStatefulCheckpoint): |
| self._handle_initiate_stateful_checkpoint(message) |
| else: |
| raise RuntimeError("Unknown kind of message received from Stream Manager") |
| except Exception as e: |
| Log.error("Error happened while handling a message from stmgr: " + str(e)) |
| Log.error(traceback.format_exc()) |
| sys.exit(1) |
| |
| def on_error(self): |
| Log.error("Disconnected from Stream Manager") |
| # retry again |
| self.on_connect(StatusCode.CONNECT_ERROR) |
| |
| def _register_msg_to_handle(self): |
| # pylint: disable=unnecessary-lambda |
| new_instance_builder = lambda: stmgr_pb2.NewInstanceAssignmentMessage() |
| hts2_msg_builder = lambda: tuple_pb2.HeronTupleSet2() |
| stateful_start_msg_builder = lambda: ckptmgr_pb2.StartInstanceStatefulProcessing() |
| stateful_restore_msg_builder = lambda: ckptmgr_pb2.RestoreInstanceStateRequest() |
| stateful_initiate_msg_builder = lambda: ckptmgr_pb2.InitiateStatefulCheckpoint() |
| self.register_on_message(new_instance_builder) |
| self.register_on_message(hts2_msg_builder) |
| self.register_on_message(stateful_start_msg_builder) |
| self.register_on_message(stateful_restore_msg_builder) |
| self.register_on_message(stateful_initiate_msg_builder) |
| |
| def _send_register_req(self): |
| request = stmgr_pb2.RegisterInstanceRequest() |
| request.instance.MergeFrom(self.instance) |
| request.topology_name = self.topology_name |
| request.topology_id = self.topology_id |
| |
| timeout_sec = float(self.sys_config[constants.INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC]) |
| |
| self.send_request(request, "Context", stmgr_pb2.RegisterInstanceResponse(), timeout_sec) |
| |
| def _handle_register_response(self, response): |
| """Called when a register response (RegisterInstanceResponse) arrives""" |
| if response.status.status != common_pb2.StatusCode.Value("OK"): |
| raise RuntimeError("Stream Manager returned a not OK response for register") |
| Log.info("We registered ourselves to the Stream Manager") |
| |
| self.is_registered = True |
| if response.HasField("pplan"): |
| Log.info("Handling assignment message from response") |
| self._handle_assignment_message(response.pplan) |
| else: |
| Log.debug("Received a register response with no pplan") |
| |
| def _handle_new_tuples_2(self, hts2): |
| """Called when new HeronTupleSet2 arrives |
| """ |
| self.heron_instance_cls.handle_new_tuple_set_2(hts2) |
| |
| def _handle_initiate_stateful_checkpoint(self, ckptmsg): |
| """Called when new InitiateStatefulCheckpoint arrives |
| """ |
| self.heron_instance_cls.handle_initiate_stateful_checkpoint(ckptmsg) |
| |
| def _handle_start_stateful_processing(self, startmsg): |
| """Called when new StartInstanceStatefulProcessing arrives |
| """ |
| self.heron_instance_cls.handle_start_stateful_processing(startmsg) |
| |
| def _handle_restore_instance_state(self, restoremsg): |
| """Called when new RestoreInstanceStateRequest arrives |
| """ |
| self.heron_instance_cls.handle_restore_instance_state(restoremsg) |
| |
| def _handle_assignment_message(self, pplan): |
| """Called when new NewInstanceAssignmentMessage arrives""" |
| Log.debug("In handle_assignment_message() of STStmgrClient, Physical Plan: \n%s", str(pplan)) |
| self.heron_instance_cls.handle_assignment_msg(pplan) |