blob: 1616083377b408530cfa36341fd197b13bcd818a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import re
import subprocess
import sys
import yaml
import time
from os import environ
from subprocess import call
from ozone import util
from ozone.constants import Command
from ozone.blockade import Blockade
from ozone.client import OzoneClient
from ozone.container import Container
from ozone.exceptions import ContainerNotFoundError
class Configuration:
Configurations to be used while starting Ozone Cluster.
Here @property decorators is used to achieve getters, setters and delete
behaviour for 'datanode_count' attribute.
@datanode_count.setter will set the value for 'datanode_count' attribute.
@datanode_count.deleter will delete the current value of 'datanode_count'
def __init__(self):
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = os.path.join(environ.get("OZONE_HOME"), "compose", "ozoneblockade")
self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
__parent_dir__ = os.path.dirname(os.path.dirname(os.path.dirname(
self.docker_compose_file = os.path.join(__parent_dir__,
"compose", "ozoneblockade",
self._datanode_count = 3
os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
def datanode_count(self):
return self._datanode_count
def datanode_count(self, datanode_count):
self._datanode_count = datanode_count
def datanode_count(self):
del self._datanode_count
class OzoneCluster(object):
This represents Ozone Cluster.
Here @property decorators is used to achieve getters, setters and delete
behaviour for 'om', 'scm', 'datanodes' and 'client' attributes.
__logger__ = logging.getLogger(__name__)
def __init__(self, conf):
self.conf = conf
self.docker_compose_file = conf.docker_compose_file
self._om = None
self._scm = None
self._datanodes = None
self._client = None
self.scm_uuid = None
self.datanode_dir = None
def om(self):
return self._om
def om(self, om):
self._om = om
def om(self):
del self._om
def scm(self):
return self._scm
def scm(self, scm):
self._scm = scm
def scm(self):
del self._scm
def datanodes(self):
return self._datanodes
def datanodes(self, datanodes):
self._datanodes = datanodes
def datanodes(self):
del self._datanodes
def client(self):
return self._client
def client(self, client):
self._client = client
def client(self):
del self._client
def create(cls, config=Configuration()):
return OzoneCluster(config)
def start(self):
Start Ozone Cluster in docker containers.
# check if docker is up.
if "OZONE_RUNNER_VERSION" not in os.environ:
self.__logger__.error("OZONE_RUNNER_VERSION is not set.")
if "HDDS_VERSION" not in os.environ:
self.__logger__.error("HDDS_VERSION is not set.")
sys.exit(1)"Starting Ozone Cluster")
if Blockade.blockade_status() == 0:
call([Command.docker_compose, "-f", self.docker_compose_file,
"up", "-d", "--scale",
"datanode=" + str(self.conf.datanode_count)])"Waiting 10s for cluster start up...")
# Remove the sleep and wait only till the cluster is out of safemode
output = subprocess.check_output([Command.docker_compose, "-f",
self.docker_compose_file, "ps"])
node_list = []
for out in output.split("\n")[2:-1]:
node = out.split(" ")[0]
Blockade.blockade_add(node) = filter(lambda x: 'om' in x, node_list)[0]
self.scm = filter(lambda x: 'scm' in x, node_list)[0]
self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
self.client = filter(lambda x: 'ozone_client' in x, node_list)[0]
self.scm_uuid = self.__get_scm_uuid__()
self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
assert node_list, "no node found in the cluster!""blockade created with nodes %s", ' '.join(node_list))
def get_conf_value(self, key):
Returns the value of given configuration key.
command = [Command.ozone, "getconf -confKey " + key]
exit_code, output = util.run_docker_command(command,
return str(output).strip()
def scale_datanode(self, datanode_count):
Commission new datanodes to the running cluster.
call([Command.docker_compose, "-f", self.docker_compose_file,
"up", "-d", "--scale", "datanode=" + datanode_count])
def partition_network(self, *args):
Partition the network which is used by the cluster.
def restore_network(self):
Restores the network partition.
def __get_scm_uuid__(self):
Returns SCM's UUID.
ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
exit_code, output = util.run_docker_command(command, self.scm)
output_list = output.split("\n")
key_value = [x for x in output_list if"\w+=\w+", x)]
uuid = [token for token in key_value if 'scmUuid' in token]
return uuid.pop().split("=")[1].strip()
def get_client(self):
return OzoneClient(self)
def get_container(self, container_id):
command = [Command.ozone, "scmcli list -c=1 -s=%s | grep containerID", container_id - 1]
exit_code, output = util.run_docker_command(command,
if exit_code != 0:
raise ContainerNotFoundError(container_id)
return Container(container_id, self)
def is_container_replica_exist(self, container_id, datanode):
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
exit_code, output = util.run_docker_command(command, datanode)
container_path = output.strip()
if not container_path:
return False
return True
def get_containers_on_datanode(self, datanode):
Returns all the container on given datanode.
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '*.container'" % container_parent_path
exit_code, output = util.run_docker_command(command, datanode)
containers = []
container_list = map(str.strip, output.split("\n"))
for container_path in container_list:
# Reading the container file.
exit_code, output = util.run_docker_command(
"cat " + container_path, datanode)
if exit_code is not 0:
data = output.split("\n")
# Reading key value pairs from container file.
key_value = [x for x in data if"\w+:\s\w+", x)]
content = "\n".join(key_value)
content_yaml = yaml.load(content)
if content_yaml is None:
containers.append(Container(content_yaml.get('containerID'), self))
return containers
def get_container_state(self, container_id, datanode):
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
exit_code, output = util.run_docker_command(command, datanode)
container_path = output.strip()
if not container_path:
raise ContainerNotFoundError("Container not found!")
# Reading the container file.
exit_code, output = util.run_docker_command("cat " + container_path, datanode)
if exit_code != 0:
raise ContainerNotFoundError("Container not found!")
data = output.split("\n")
# Reading key value pairs from container file.
key_value = [x for x in data if"\w+:\s\w+", x)]
content = "\n".join(key_value)
content_yaml = yaml.load(content)
return str(content_yaml.get('state')).lstrip()
def get_container_datanodes(self, container_id):
result = []
for datanode in self.datanodes:
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
exit_code, output = util.run_docker_command(command, datanode)
if output.strip():
return result
def stop(self):
Stops the Ozone Cluster.
""""Stopping Ozone Cluster")
call([Command.docker_compose, "-f", self.docker_compose_file, "down"])