| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import socket |
| |
| import thespian.actors |
| import thespian.system.messages.status |
| |
| from solrorbit import exceptions, log |
| from solrorbit.utils import console, net |
| |
| |
| class BenchmarkFailure: |
| """ |
| Indicates a failure in the benchmark execution due to an exception |
| """ |
| def __init__(self, message, cause=None): |
| self.message = message |
| self.cause = cause |
| |
| |
| class BenchmarkCancelled: |
| """ |
| Indicates that the benchmark has been cancelled (by the user). |
| """ |
| |
| |
| def parametrized(decorator): |
| """ |
| |
| Helper meta-decorator that allows us to provide parameters to a decorator. |
| |
| :param decorator: The decorator that should accept parameters. |
| """ |
| def inner(*args, **kwargs): |
| def g(f): |
| return decorator(f, *args, **kwargs) |
| return g |
| return inner |
| |
| |
| @parametrized |
| def no_retry(f, actor_name): |
| """ |
| |
| Decorator intended for Thespian message handlers with the signature ``receiveMsg_$MSG_NAME(self, msg, sender)``. Thespian will |
| assume that a message handler that raises an exception can be retried. It will then retry once and give up afterwards just leaving |
| a trace of that in the actor system's internal log file. However, this is usually *not* what we want in Solr Orbit. If handling of a |
| message fails we instead want to notify a node higher up in the actor hierarchy. |
| |
| We achieve that by sending a ``BenchmarkFailure`` message to the original sender. Note that this might as well be the current |
| actor (e.g. when handling a ``Wakeup`` message). In that case the actor itself is responsible for forwarding the benchmark failure |
| to its parent actor. |
| |
| Example usage: |
| |
| @no_retry("special forces actor") |
| def receiveMsg_DefuseBomb(self, msg, sender): |
| # might raise an exception |
| pass |
| |
| If this message handler raises an exception, the decorator will turn it into a ``BenchmarkFailure`` message with its ``message`` |
| property set to "Error in special forces actor" which is returned to the original sender. |
| |
| :param f: The message handler. Does not need to passed directly, this is handled by the decorator infrastructure. |
| :param actor_name: A human readable name of the current actor that should be used in the exception message. |
| """ |
| def guard(self, msg, sender): |
| try: |
| return f(self, msg, sender) |
| except BaseException as e: |
| msg = "Error in {}".format(actor_name) |
| # log here as the full trace might get lost. |
| logging.getLogger(__name__).exception(msg) |
| # don't forward the exception as is because the main process might not have this class available on the load path |
| # and will fail then while deserializing the cause. |
| self.send(sender, BenchmarkFailure("{} ({})".format(msg, str(e)))) |
| return guard |
| |
| |
| class BenchmarkActor(thespian.actors.ActorTypeDispatcher): |
| def __init__(self, *args, **kw): |
| super().__init__(*args, **kw) |
| self.children = [] |
| self.received_responses = [] |
| self.status = None |
| log.post_configure_actor_logging() |
| self.logger = logging.getLogger(__name__) |
| console.set_assume_tty(assume_tty=False) |
| |
| # The method name is required by the actor framework |
| # noinspection PyPep8Naming |
| @staticmethod |
| def actorSystemCapabilityCheck(capabilities, requirements): |
| logger = logging.getLogger(__name__) |
| for name, value in requirements.items(): |
| current = capabilities.get(name, None) |
| if current != value: |
| # A mismatch by is not a problem by itself as long as at least one actor system instance matches the requirements. |
| logger.info("Checking capabilities [%s] against requirements [%s] failed.", capabilities, requirements) |
| return False |
| logger.info("Capabilities [%s] match requirements [%s].", capabilities, requirements) |
| return True |
| |
| def transition_when_all_children_responded(self, sender, msg, expected_status, new_status, transition): |
| """ |
| |
| Waits until all children have sent a specific response message and then transitions this actor to a new status. |
| |
| :param sender: The child actor that has responded. |
| :param msg: The response message. |
| :param expected_status: The status in which this actor should be upon calling this method. |
| :param new_status: The new status once all child actors have responded. |
| :param transition: A parameter-less function to call immediately after changing the status. |
| """ |
| if self.is_current_status_expected(expected_status): |
| self.received_responses.append(msg) |
| response_count = len(self.received_responses) |
| expected_count = len(self.children) |
| |
| self.logger.debug("[%d] of [%d] child actors have responded for transition from [%s] to [%s].", |
| response_count, expected_count, self.status, new_status) |
| if response_count == expected_count: |
| self.logger.debug("All [%d] child actors have responded. Transitioning now from [%s] to [%s].", |
| expected_count, self.status, new_status) |
| # all nodes have responded, change status |
| self.status = new_status |
| self.received_responses = [] |
| transition() |
| elif response_count > expected_count: |
| raise exceptions.BenchmarkAssertionError( |
| "Received [%d] responses but only [%d] were expected to transition from [%s] to [%s]. The responses are: %s" % |
| (response_count, expected_count, self.status, new_status, self.received_responses)) |
| else: |
| raise exceptions.BenchmarkAssertionError("Received [%s] from [%s] but we are in status [%s] instead of [%s]." % |
| (type(msg), sender, self.status, expected_status)) |
| |
| def send_to_children_and_transition(self, sender, msg, expected_status, new_status): |
| """ |
| |
| Sends the provided message to all child actors and immediately transitions to the new status. |
| |
| :param sender: The actor from which we forward this message (in case it is message forwarding). Otherwise our own address. |
| :param msg: The message to send. |
| :param expected_status: The status in which this actor should be upon calling this method. |
| :param new_status: The new status. |
| """ |
| if self.is_current_status_expected(expected_status): |
| self.logger.info("Transitioning from [%s] to [%s].", self.status, new_status) |
| self.status = new_status |
| for m in filter(None, self.children): |
| self.send(m, msg) |
| else: |
| raise exceptions.BenchmarkAssertionError("Received [%s] from [%s] but we are in status [%s] instead of [%s]." % |
| (type(msg), sender, self.status, expected_status)) |
| |
| def is_current_status_expected(self, expected_status): |
| # if we don't expect anything, we're always in the right status |
| if not expected_status: |
| return True |
| # do an explicit check for a list here because strings are also iterable and we have very tight control over this code anyway. |
| elif isinstance(expected_status, list): |
| return self.status in expected_status |
| else: |
| return self.status == expected_status |
| |
| |
| def actor_system_already_running(ip="127.0.0.1"): |
| """ |
| Determines whether an actor system is already running by opening a socket connection. |
| |
| Note: It may be possible that another system is running on the same port. |
| """ |
| s = socket.socket() |
| try: |
| s.connect((ip, 1900)) |
| s.close() |
| return True |
| except Exception: |
| return False |
| |
| |
| __SYSTEM_BASE = "multiprocTCPBase" |
| |
| |
| def use_offline_actor_system(): |
| global __SYSTEM_BASE |
| __SYSTEM_BASE = "multiprocQueueBase" |
| |
| |
| def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None): |
| logger = logging.getLogger(__name__) |
| system_base = __SYSTEM_BASE |
| try: |
| if try_join: |
| if actor_system_already_running(): |
| logger.info("Joining already running actor system with system base [%s].", system_base) |
| return thespian.actors.ActorSystem(system_base) |
| else: |
| logger.info("Creating new actor system with system base [%s] on coordinator node.", system_base) |
| # if we try to join we can only run on the coordinator... |
| return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities={"coordinator": True}) |
| elif prefer_local_only: |
| coordinator = True |
| if system_base != "multiprocQueueBase": |
| coordinator_ip = "127.0.0.1" |
| local_ip = "127.0.0.1" |
| else: |
| coordinator_ip = None |
| local_ip = None |
| else: |
| if system_base not in ("multiprocTCPBase", "multiprocUDPBase"): |
| raise exceptions.SystemSetupError("Solr Orbit requires a network-capable system base but got [%s]." % system_base) |
| if not coordinator_ip: |
| raise exceptions.SystemSetupError("coordinator IP is required") |
| if not local_ip: |
| raise exceptions.SystemSetupError("local IP is required") |
| # always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy |
| local_ip = net.resolve(local_ip) |
| coordinator_ip = net.resolve(coordinator_ip) |
| |
| coordinator = local_ip == coordinator_ip |
| |
| capabilities = {"coordinator": coordinator} |
| if local_ip: |
| # just needed to determine whether to run benchmarks locally |
| capabilities["ip"] = local_ip |
| if coordinator_ip: |
| # Make the coordinator node the convention leader |
| capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip |
| logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities) |
| return thespian.actors.ActorSystem(system_base, |
| logDefs=log.load_configuration(), |
| capabilities=capabilities) |
| except thespian.actors.ActorSystemException: |
| logger.exception("Could not initialize internal actor system.") |
| raise |