blob: a1cf525a808055d7c29db5f235b47814957b7d13 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from kazoo.client import KazooClient
import logging
import sys
import traceback
logger = logging.getLogger(__name__)
class ZkClient:
"""
Wrapper class over KazooClient. Provides utility methods for standalone failure tests to get details about
processor group state stored in zookeeper.
Instantiates a kazoo client to connect to zookeeper server at :param zookeeper_host::param zookeeper_port.
"""
def __init__(self, zookeeper_host, zookeeper_port, app_name, app_id):
self.protocol_version = "2.0"
self.kazoo_client = KazooClient(hosts='{0}:{1}'.format(zookeeper_host, zookeeper_port))
self.zk_base_node = 'app-{0}-{1}/{2}-{3}-{4}-coordinationData'.format(app_name, app_id, app_name, app_id, self.protocol_version)
def start(self):
"""
Establishes connection with the zookeeper server at self.host_name:self.port.
"""
self.kazoo_client.start()
def stop(self):
"""
Closes and releases the connection held with the zookeeper server.
"""
self.kazoo_client.stop()
def watch_job_model(self, watch_function):
self.kazoo_client.ensure_path('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node))
self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function)
def get_latest_job_model(self):
"""
Reads and returns the latest JobModel from zookeeper.
"""
job_model_dict = {}
try:
childZkNodes = self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node))
if len(childZkNodes) > 0:
childZkNodes.sort()
childZkNodes.reverse()
job_model_generation_path = '{0}/jobModelGeneration/jobModels/{1}/0'.format(self.zk_base_node, childZkNodes[0])
job_model, _ = self.kazoo_client.get(job_model_generation_path)
"""
ZkClient java library stores the data in the following format in zookeeper:
class_name, data_length, actual_data
JobModel json manipulation: Delete all the characters before first occurrence of '{' in jobModel json string.
Normal json deserialization without the above custom string massaging fails. This will be removed after SAMZA-1749.
"""
first_curly_brace_index = job_model.find('{')
last_curly_brace_index = job_model.rindex('}')
job_model = job_model[first_curly_brace_index: last_curly_brace_index + 1]
job_model_dict = json.loads(job_model)
logger.info("Recent JobModel in zookeeper: {0}".format(job_model_dict))
except:
logger.error(traceback.format_exc(sys.exc_info()))
return job_model_dict
def get_leader_processor_id(self):
"""
Determines the processorId of the current leader in a processors group.
Returns the processorId of the leader if leader exists.
Returns None otherwise.
"""
leader_processor_id = None
try:
processors_path = '{0}/processors'.format(self.zk_base_node)
childZkNodes = self.kazoo_client.get_children(processors_path)
childZkNodes.sort()
child_processor_path = '{0}/{1}'.format(processors_path, childZkNodes[0])
processor_data, _ = self.kazoo_client.get(child_processor_path)
host, leader_processor_id = processor_data.split(" ")
except:
logger.error(traceback.format_exc(sys.exc_info()))
return leader_processor_id
def purge_all_nodes(self):
"""
Recursively delete all zookeeper nodes from the base node: self.zk_base_node.
"""
try:
self.kazoo_client.delete(path=self.zk_base_node, version=-1, recursive=True)
except:
logger.error(traceback.format_exc(sys.exc_info()))
def get_active_processors(self):
"""
Determines the processor ids that are active in zookeeper.
"""
processor_ids = []
try:
processors_path = '{0}/processors'.format(self.zk_base_node)
childZkNodes = self.kazoo_client.get_children(processors_path)
childZkNodes.sort()
for childZkNode in childZkNodes:
child_processor_path = '{0}/{1}'.format(processors_path, childZkNode)
processor_data, _ = self.kazoo_client.get(child_processor_path)
host, processor_id = processor_data.split(" ")
processor_ids.append(processor_id)
except:
logger.error(traceback.format_exc(sys.exc_info()))
return processor_ids