blob: 31cd056eb506dbec2948315bff1cf41827fc9919 [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' zkstatemanager.py '''
import sys
from six import reraise as raise_
from heron.proto.execution_state_pb2 import ExecutionState
from heron.proto.packing_plan_pb2 import PackingPlan
from heron.proto.physical_plan_pb2 import PhysicalPlan
from heron.proto.scheduler_pb2 import SchedulerLocation
from heron.proto.tmaster_pb2 import TMasterLocation
from heron.proto.topology_pb2 import Topology
from heron.statemgrs.src.python.log import Log as LOG
from heron.statemgrs.src.python.statemanager import StateManager
from heron.statemgrs.src.python.stateexceptions import StateException
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
from kazoo.exceptions import NoNodeError
from kazoo.exceptions import NotEmptyError
from kazoo.exceptions import ZookeeperError
def _makehostportlist(hostportlist):
return ','.join(["%s:%i" % hp for hp in hostportlist])
# pylint: disable=attribute-defined-outside-init
class ZkStateManager(StateManager):
"""
State manager which connects to zookeeper and
gets and sets states from there.
"""
def __init__(self, name, hostportlist, rootpath, tunnelhost):
super(ZkStateManager, self).__init__()
self.name = name
self.hostportlist = hostportlist
self.tunnelhost = tunnelhost
self.rootpath = rootpath
# pylint: disable=no-self-use
def _kazoo_client(self, hostportlist):
"""
For Unit testing, replace this method to not
Actually return a client
"""
return KazooClient(hostportlist)
def start(self):
""" state Zookeeper """
if self.is_host_port_reachable():
self.client = self._kazoo_client(_makehostportlist(self.hostportlist))
else:
localhostports = self.establish_ssh_tunnel()
self.client = self._kazoo_client(_makehostportlist(localhostports))
self.client.start()
def on_connection_change(state):
""" callback to log """
LOG.info("Connection state changed to: " + state)
self.client.add_listener(on_connection_change)
def stop(self):
""" stop Zookeeper """
self.client.stop()
self.terminate_ssh_tunnel()
# pylint: disable=function-redefined
def get_topologies(self, callback=None):
""" get topologies """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
"""Custom callback to get the topologies right now."""
ret["result"] = data
try:
# Ensure the topology path exists. If a topology has never been deployed
# then the path will not exist so create it and don't crash.
# (fixme) add a watch instead of creating the path?
self.client.ensure_path(self.get_topologies_path())
self._get_topologies_with_watch(callback, isWatching)
except NoNodeError:
self.client.stop()
path = self.get_topologies_path()
raise_(StateException("Error required topology path '%s' not found" % (path),
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
# The topologies are now populated with the data.
return ret["result"]
def _get_topologies_with_watch(self, callback, isWatching):
"""
Helper function to get topologies with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_topologies_path()
if isWatching:
LOG.info("Adding children watch for path: " + path)
# pylint: disable=unused-variable
@self.client.ChildrenWatch(path)
def watch_topologies(topologies):
""" callback to watch topologies """
callback(topologies)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching
def get_topology(self, topologyName, callback=None):
""" get topologies """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
"""Custom callback to get the topologies right now."""
ret["result"] = data
self._get_topology_with_watch(topologyName, callback, isWatching)
# The topologies are now populated with the data.
return ret["result"]
def _get_topology_with_watch(self, topologyName, callback, isWatching):
"""
Helper function to get pplan with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_topology_path(topologyName)
if isWatching:
LOG.info("Adding data watch for path: " + path)
# pylint: disable=unused-variable, unused-argument
@self.client.DataWatch(path)
def watch_topology(data, stats):
""" watch topology """
if data:
topology = Topology()
topology.ParseFromString(data)
callback(topology)
else:
callback(None)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching
def create_topology(self, topologyName, topology):
""" crate topology """
if not topology or not topology.IsInitialized():
raise_(StateException("Topology protobuf not init properly",
StateException.EX_TYPE_PROTOBUF_ERROR), sys.exc_info()[2])
path = self.get_topology_path(topologyName)
LOG.info("Adding topology: {0} to path: {1}".format(
topologyName, path))
topologyString = topology.SerializeToString()
try:
self.client.create(path, value=topologyString, makepath=True)
return True
except NoNodeError:
raise_(StateException("NoNodeError while creating topology",
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
except NodeExistsError:
raise_(StateException("NodeExistsError while creating topology",
StateException.EX_TYPE_NODE_EXISTS_ERROR), sys.exc_info()[2])
except ZookeeperError:
raise_(StateException("Zookeeper while creating topology",
StateException.EX_TYPE_ZOOKEEPER_ERROR), sys.exc_info()[2])
except Exception:
# Just re raise the exception.
raise
def delete_topology(self, topologyName):
""" delete topology """
path = self.get_topology_path(topologyName)
LOG.info("Removing topology: {0} from path: {1}".format(
topologyName, path))
try:
self.client.delete(path)
return True
except NoNodeError:
raise_(StateException("NoNodeError while deteling topology",
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
except NotEmptyError:
raise_(StateException("NotEmptyError while deleting topology",
StateException.EX_TYPE_NOT_EMPTY_ERROR), sys.exc_info()[2])
except ZookeeperError:
raise_(StateException("Zookeeper while deleting topology",
StateException.EX_TYPE_ZOOKEEPER_ERROR), sys.exc_info()[2])
except Exception:
# Just re raise the exception.
raise
def get_packing_plan(self, topologyName, callback=None):
""" get packing plan """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
""" Custom callback to get the topologies right now. """
ret["result"] = data
self._get_packing_plan_with_watch(topologyName, callback, isWatching)
# The topologies are now populated with the data.
return ret["result"]
def _get_packing_plan_with_watch(self, topologyName, callback, isWatching):
"""
Helper function to get packing_plan with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_packing_plan_path(topologyName)
if isWatching:
LOG.info("Adding data watch for path: " + path)
# pylint: disable=unused-argument,unused-variable
@self.client.DataWatch(path)
def watch_packing_plan(data, stats):
""" watch the packing plan for updates """
if data:
packing_plan = PackingPlan()
packing_plan.ParseFromString(data)
callback(packing_plan)
else:
callback(None)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching
def get_pplan(self, topologyName, callback=None):
""" get physical plan """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
"""
Custom callback to get the topologies right now.
"""
ret["result"] = data
self._get_pplan_with_watch(topologyName, callback, isWatching)
# The topologies are now populated with the data.
return ret["result"]
def _get_pplan_with_watch(self, topologyName, callback, isWatching):
"""
Helper function to get pplan with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_pplan_path(topologyName)
if isWatching:
LOG.info("Adding data watch for path: " + path)
# pylint: disable=unused-variable, unused-argument
@self.client.DataWatch(path)
def watch_pplan(data, stats):
""" invoke callback to watch physical plan """
if data:
pplan = PhysicalPlan()
pplan.ParseFromString(data)
callback(pplan)
else:
callback(None)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching
def create_pplan(self, topologyName, pplan):
""" create physical plan """
if not pplan or not pplan.IsInitialized():
raise_(StateException("Physical Plan protobuf not init properly",
StateException.EX_TYPE_PROTOBUF_ERROR), sys.exc_info()[2])
path = self.get_pplan_path(topologyName)
LOG.info("Adding topology: {0} to path: {1}".format(
topologyName, path))
pplanString = pplan.SerializeToString()
try:
self.client.create(path, value=pplanString, makepath=True)
return True
except NoNodeError:
raise_(StateException("NoNodeError while creating pplan",
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
except NodeExistsError:
raise_(StateException("NodeExistsError while creating pplan",
StateException.EX_TYPE_NODE_EXISTS_ERROR), sys.exc_info()[2])
except ZookeeperError:
raise_(StateException("Zookeeper while creating pplan",
StateException.EX_TYPE_ZOOKEEPER_ERROR), sys.exc_info()[2])
except Exception:
# Just re raise the exception.
raise
def delete_pplan(self, topologyName):
""" delete physical plan info """
path = self.get_pplan_path(topologyName)
LOG.info("Removing topology: {0} from path: {1}".format(
topologyName, path))
try:
self.client.delete(path)
return True
except NoNodeError:
raise_(StateException("NoNodeError while deleting pplan",
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
except NotEmptyError:
raise_(StateException("NotEmptyError while deleting pplan",
StateException.EX_TYPE_NOT_EMPTY_ERROR), sys.exc_info()[2])
except ZookeeperError:
raise_(StateException("Zookeeper while deleting pplan",
StateException.EX_TYPE_ZOOKEEPER_ERROR), sys.exc_info()[2])
except Exception:
# Just re raise the exception.
raise
def get_execution_state(self, topologyName, callback=None):
""" get execution state """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
"""
Custom callback to get the topologies right now.
"""
ret["result"] = data
self._get_execution_state_with_watch(topologyName, callback, isWatching)
# The topologies are now populated with the data.
return ret["result"]
def _get_execution_state_with_watch(self, topologyName, callback, isWatching):
"""
Helper function to get execution state with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_execution_state_path(topologyName)
if isWatching:
LOG.info("Adding data watch for path: " + path)
# pylint: disable=unused-variable, unused-argument
@self.client.DataWatch(path)
def watch_execution_state(data, stats):
""" invoke callback to watch execute state """
if data:
executionState = ExecutionState()
executionState.ParseFromString(data)
callback(executionState)
else:
callback(None)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching
def create_execution_state(self, topologyName, executionState):
""" create execution state """
if not executionState or not executionState.IsInitialized():
raise_(StateException("Execution State protobuf not init properly",
StateException.EX_TYPE_PROTOBUF_ERROR), sys.exc_info()[2])
path = self.get_execution_state_path(topologyName)
LOG.info("Adding topology: {0} to path: {1}".format(
topologyName, path))
executionStateString = executionState.SerializeToString()
try:
self.client.create(path, value=executionStateString, makepath=True)
return True
except NoNodeError:
raise_(StateException("NoNodeError while creating execution state",
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
except NodeExistsError:
raise_(StateException("NodeExistsError while creating execution state",
StateException.EX_TYPE_NODE_EXISTS_ERROR), sys.exc_info()[2])
except ZookeeperError:
raise_(StateException("Zookeeper while creating execution state",
StateException.EX_TYPE_ZOOKEEPER_ERROR), sys.exc_info()[2])
except Exception:
# Just re raise the exception.
raise
def delete_execution_state(self, topologyName):
""" delete execution state """
path = self.get_execution_state_path(topologyName)
LOG.info("Removing topology: {0} from path: {1}".format(
topologyName, path))
try:
self.client.delete(path)
return True
except NoNodeError:
raise_(StateException("NoNodeError while deleting execution state",
StateException.EX_TYPE_NO_NODE_ERROR), sys.exc_info()[2])
except NotEmptyError:
raise_(StateException("NotEmptyError while deleting execution state",
StateException.EX_TYPE_NOT_EMPTY_ERROR), sys.exc_info()[2])
except ZookeeperError:
raise_(StateException("Zookeeper while deleting execution state",
StateException.EX_TYPE_ZOOKEEPER_ERROR), sys.exc_info()[2])
except Exception:
# Just re raise the exception.
raise
def get_tmaster(self, topologyName, callback=None):
""" get tmaster """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
"""
Custom callback to get the topologies right now.
"""
ret["result"] = data
self._get_tmaster_with_watch(topologyName, callback, isWatching)
# The topologies are now populated with the data.
return ret["result"]
def _get_tmaster_with_watch(self, topologyName, callback, isWatching):
"""
Helper function to get pplan with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_tmaster_path(topologyName)
if isWatching:
LOG.info("Adding data watch for path: " + path)
# pylint: disable=unused-variable, unused-argument
@self.client.DataWatch(path)
def watch_tmaster(data, stats):
""" invoke callback to watch tmaster """
if data:
tmaster = TMasterLocation()
tmaster.ParseFromString(data)
callback(tmaster)
else:
callback(None)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching
def get_scheduler_location(self, topologyName, callback=None):
""" get scheduler location """
isWatching = False
# Temp dict used to return result
# if callback is not provided.
ret = {
"result": None
}
if callback:
isWatching = True
else:
def callback(data):
"""
Custom callback to get the scheduler location right now.
"""
ret["result"] = data
self._get_scheduler_location_with_watch(topologyName, callback, isWatching)
return ret["result"]
def _get_scheduler_location_with_watch(self, topologyName, callback, isWatching):
"""
Helper function to get scheduler location with
a callback. The future watch is placed
only if isWatching is True.
"""
path = self.get_scheduler_location_path(topologyName)
if isWatching:
LOG.info("Adding data watch for path: " + path)
# pylint: disable=unused-variable, unused-argument
@self.client.DataWatch(path)
def watch_scheduler_location(data, stats):
""" invoke callback to watch scheduler location """
if data:
scheduler_location = SchedulerLocation()
scheduler_location.ParseFromString(data)
callback(scheduler_location)
else:
callback(None)
# Returning False will result in no future watches
# being triggered. If isWatching is True, then
# the future watches will be triggered.
return isWatching