blob: 2deef9eb6c93998ab56d78449b3856e34457a38d [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' topology.py '''
import traceback
import uuid
from heronpy.api import api_constants
from heron.common.src.python.utils.log import Log
# pylint: disable=too-many-instance-attributes
class Topology(object):
"""
Class Topology
Contains all the relevant information about
a topology that its state manager has.
All this info is fetched from state manager in one go.
The watches are the callbacks that are called
when there is any change in the topology
instance using set_physical_plan, set_execution_state,
set_tmaster, and set_scheduler_location. Any other means of changing will
not call the watches.
"""
def __init__(self, name, state_manager_name):
self.zone = None
self.name = name
self.state_manager_name = state_manager_name
self.physical_plan = None
self.packing_plan = None
self.execution_state = None
self.id = None
self.cluster = None
self.environ = None
self.tmaster = None
self.scheduler_location = None
# A map from UUIDs to the callback
# functions.
self.watches = {}
def register_watch(self, callback):
"""
Returns the UUID with which the watch is
registered. This UUID can be used to unregister
the watch.
Returns None if watch could not be registered.
The argument 'callback' must be a function that takes
exactly one argument, the topology on which
the watch was triggered.
Note that the watch will be unregistered in case
it raises any Exception the first time.
This callback is also called at the time
of registration.
"""
RETRY_COUNT = 5
# Retry in case UID is previously
# generated, just in case...
for _ in range(RETRY_COUNT):
# Generate a random UUID.
uid = uuid.uuid4()
if uid not in self.watches:
Log.info("Registering a watch with uid: " + str(uid))
try:
callback(self)
except Exception as e:
Log.error("Caught exception while triggering callback: " + str(e))
Log.debug(traceback.format_exc())
return None
self.watches[uid] = callback
return uid
return None
def unregister_watch(self, uid):
"""
Unregister the watch with the given UUID.
"""
# Do not raise an error if UUID is
# not present in the watches.
Log.info("Unregister a watch with uid: " + str(uid))
self.watches.pop(uid, None)
def trigger_watches(self):
"""
Call all the callbacks.
If any callback raises an Exception,
unregister the corresponding watch.
"""
to_remove = []
for uid, callback in list(self.watches.items()):
try:
callback(self)
except Exception as e:
Log.error("Caught exception while triggering callback: " + str(e))
Log.debug(traceback.format_exc())
to_remove.append(uid)
for uid in to_remove:
self.unregister_watch(uid)
def set_physical_plan(self, physical_plan):
""" set physical plan """
if not physical_plan:
self.physical_plan = None
self.id = None
else:
self.physical_plan = physical_plan
self.id = physical_plan.topology.id
self.trigger_watches()
def set_packing_plan(self, packing_plan):
""" set packing plan """
if not packing_plan:
self.packing_plan = None
self.id = None
else:
self.packing_plan = packing_plan
self.id = packing_plan.id
self.trigger_watches()
# pylint: disable=no-self-use
def get_execution_state_dc_environ(self, execution_state):
"""
Helper function to extract dc and environ from execution_state.
Returns a tuple (cluster, environ).
"""
return (execution_state.cluster, execution_state.environ)
def set_execution_state(self, execution_state):
""" set exectuion state """
if not execution_state:
self.execution_state = None
self.cluster = None
self.environ = None
else:
self.execution_state = execution_state
cluster, environ = self.get_execution_state_dc_environ(execution_state)
self.cluster = cluster
self.environ = environ
self.zone = cluster
self.trigger_watches()
def set_tmaster(self, tmaster):
""" set exectuion state """
self.tmaster = tmaster
self.trigger_watches()
def set_scheduler_location(self, scheduler_location):
""" set exectuion state """
self.scheduler_location = scheduler_location
self.trigger_watches()
def num_instances(self):
"""
Number of spouts + bolts
"""
num = 0
# Get all the components
components = self.spouts() + self.bolts()
# Get instances for each worker
for component in components:
config = component.comp.config
for kvs in config.kvs:
if kvs.key == api_constants.TOPOLOGY_COMPONENT_PARALLELISM:
num += int(kvs.value)
break
return num
def spouts(self):
"""
Returns a list of Spout (proto) messages
"""
if self.physical_plan:
return list(self.physical_plan.topology.spouts)
return []
def spout_names(self):
"""
Returns a list of names of all the spouts
"""
return [component.comp.name for component in self.spouts()]
def bolts(self):
"""
Returns a list of Bolt (proto) messages
"""
if self.physical_plan:
return list(self.physical_plan.topology.bolts)
return []
def bolt_names(self):
"""
Returns a list of names of all the bolts
"""
return [component.comp.name for component in self.bolts()]
def get_machines(self):
"""
Get all the machines that this topology is running on.
These are the hosts of all the stmgrs.
"""
if self.physical_plan:
stmgrs = list(self.physical_plan.stmgrs)
return [s.host_name for s in stmgrs]
return []
def get_status(self):
"""
Get the current state of this topology.
The state values are from the topology.proto
RUNNING = 1, PAUSED = 2, KILLED = 3
if the state is None "Unknown" is returned.
"""
status = None
if self.physical_plan and self.physical_plan.topology:
status = self.physical_plan.topology.state
if status == 1:
return "Running"
elif status == 2:
return "Paused"
elif status == 3:
return "Killed"
else:
return "Unknown"