blob: 7acaaa4ff21d4994e537f632d0831e33ccd9f3f2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import cluster as CLUSTER
import database
import dateutil.parser
import utils
import os
import os.path
import prettytable
import shutil
import sys
import time
LOG = utils.get_logger()
def wait_service(need_alive, wait_timeout, cluster, fe_ids, be_ids):
if wait_timeout == 0:
return
if wait_timeout == -1:
wait_timeout = 1000000000
expire_ts = time.time() + wait_timeout
while True:
db_mgr = database.get_db_mgr(cluster.name,
cluster.get_all_node_net_infos(), False)
failed_frontends = []
for id in fe_ids:
fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id)
fe_state = db_mgr.get_fe(id)
fe_alive = fe_state and fe_state.alive
if fe_alive and need_alive:
# if need alive, check port available,
# if need dead, don't check port available because it take some time for the disconnect socket
fe_alive = utils.is_socket_avail(
fe.get_ip(), fe.meta["ports"]["query_port"])
if fe_alive != need_alive:
failed_frontends.append(id)
failed_backends = []
for id in be_ids:
be = cluster.get_node(CLUSTER.Node.TYPE_BE, id)
be_state = db_mgr.get_be(id)
be_alive = be_state and be_state.alive
if be_alive and need_alive:
be_alive = utils.is_socket_avail(
be.get_ip(), be.meta["ports"]["webserver_port"])
if be_alive != need_alive:
failed_backends.append(id)
if not failed_frontends and not failed_backends:
break
if time.time() >= expire_ts:
err = ""
failed_status = "dead" if need_alive else "alive"
if failed_frontends:
err += failed_status + " fe: " + str(failed_frontends) + ". "
if failed_backends:
err += failed_status + " be: " + str(failed_backends) + ". "
raise Exception(err)
time.sleep(1)
# return for_all, related_nodes, related_node_num
def get_ids_related_nodes(cluster,
fe_ids,
be_ids,
ms_ids,
recycle_ids,
fdb_ids,
ignore_not_exists=False):
if fe_ids is None and be_ids is None and ms_ids is None and recycle_ids is None and fdb_ids is None:
return True, None, cluster.get_all_nodes_num()
def get_ids_related_nodes_with_type(node_type, ids):
if ids is None:
return []
if not ids:
return cluster.get_all_nodes(node_type)
else:
nodes = []
for id in ids:
try:
nodes.append(cluster.get_node(node_type, id))
except Exception as e:
if ignore_not_exists:
LOG.warning(
utils.render_yellow(
"Not found {} with id {}".format(
node_type, id)))
else:
raise e
return nodes
type_ids = [
(CLUSTER.Node.TYPE_FE, fe_ids),
(CLUSTER.Node.TYPE_BE, be_ids),
(CLUSTER.Node.TYPE_MS, ms_ids),
(CLUSTER.Node.TYPE_RECYCLE, recycle_ids),
(CLUSTER.Node.TYPE_FDB, fdb_ids),
]
nodes = []
for node_type, ids in type_ids:
nodes.extend(get_ids_related_nodes_with_type(node_type, ids))
return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)
class Command(object):
def __init__(self, name):
self.name = name
def print_use_time(self):
return True
def add_parser(self, args_parsers):
raise Exception("No implemented")
def run(self, args):
raise Exception("No implemented")
def _add_parser_common_args(self, parser):
parser.add_argument("-v",
"--verbose",
default=False,
action=self._get_parser_bool_action(True),
help="verbose logging.")
parser.add_argument("--output-json",
default=False,
action=self._get_parser_bool_action(True),
help="output as json, and don't print log.")
def _add_parser_ids_args(self, parser):
group = parser.add_argument_group("for existing nodes",
"apply to the existing nodes.")
group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \
"if specific --fe-id but not specific ids, apply to all fe. Example: '--fe-id 2 3' will select fe-2 and fe-3.")
group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \
"if specific --be-id but not specific ids, apply to all be. Example: '--be-id' will select all backends.")
group.add_argument(
"--ms-id",
nargs="*",
type=int,
help=
"Specify up ms ids, support multiple ids. Only use in cloud cluster."
)
group.add_argument(
"--recycle-id",
nargs="*",
type=int,
help=
"Specify up recycle ids, support multiple ids. Only use in cloud cluster."
)
group.add_argument(
"--fdb-id",
nargs="*",
type=int,
help=
"Specify up fdb ids, support multiple ids. Only use in cloud cluster."
)
def _get_parser_bool_action(self, is_store_true):
if self._support_boolean_action():
return argparse.BooleanOptionalAction
else:
return "store_true" if is_store_true else "store_false"
def _support_boolean_action(self):
return sys.version_info.major == 3 and sys.version_info.minor >= 9
def _print_table(self, header, datas):
if utils.is_log_stdout():
table = prettytable.PrettyTable(
[utils.render_green(field) for field in header])
for row in datas:
table.add_row(row)
print(table)
return ""
else:
datas.insert(0, header)
return datas
class SimpleCommand(Command):
def __init__(self, command, help, options=[]):
super().__init__(command)
self.command = command
self.help = help
self.options = options
def add_parser(self, args_parsers):
help = self.help + " If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
"then apply to all containers."
parser = args_parsers.add_parser(self.command, help=help)
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_ids_args(parser)
self._add_parser_common_args(parser)
return parser
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
for_all, related_nodes, related_node_num = get_ids_related_nodes(
cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
args.fdb_id)
utils.exec_docker_compose_command(cluster.get_compose_file(),
self.command,
options=self.options,
nodes=related_nodes)
show_cmd = self.command[0].upper() + self.command[1:]
if for_all:
related_nodes = cluster.get_all_nodes()
LOG.info(
utils.render_green("{} succ, total related node num {}".format(
show_cmd, related_node_num)))
return cluster, related_nodes
class StartBaseCommand(SimpleCommand):
def add_parser(self, args_parsers):
parser = super().add_parser(args_parsers)
parser.add_argument(
"--wait-timeout",
type=int,
default=0,
help=
"Specify wait seconds for fe/be ready for service: 0 not wait (default), "\
"> 0 max wait seconds, -1 wait unlimited."
)
return parser
def run(self, args):
cluster, related_nodes = super().run(args)
fe_ids = [node.id for node in related_nodes if node.is_fe()]
be_ids = [node.id for node in related_nodes if node.is_be()]
if not cluster.is_host_network():
wait_service(True, args.wait_timeout, cluster, fe_ids, be_ids)
return cluster, related_nodes
class StartCommand(StartBaseCommand):
def __init__(self, command):
super().__init__(command, "Start the doris containers. "),
class RestartCommand(StartBaseCommand):
def __init__(self, command):
super().__init__(command, "Restart the doris containers. ",
["-t", "1"]),
class StopCommand(SimpleCommand):
def __init__(self, command):
super().__init__(command, "Stop the doris containers. ", ["-t", "1"]),
def add_parser(self, args_parsers):
parser = super().add_parser(args_parsers)
parser.add_argument(
"--wait-timeout",
type=int,
default=0,
help=
"Specify wait seconds for fe/be close for service: 0 not wait (default), "\
"> 0 max wait seconds, -1 wait unlimited."
)
return parser
def run(self, args):
cluster, related_nodes = super().run(args)
fe_ids = [node.id for node in related_nodes if node.is_fe()]
be_ids = [node.id for node in related_nodes if node.is_be()]
if not cluster.is_host_network():
wait_service(False, args.wait_timeout, cluster, fe_ids, be_ids)
return cluster, related_nodes
class UpCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\
"or add new containers. " \
"If none of --add-fe-num, --add-be-num, --add-ms-num, --add-recycle-num, "\
"--fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, " \
"then apply to all containers.")
parser.add_argument("NAME", default="", help="Specific cluster name.")
parser.add_argument("IMAGE",
default="",
nargs="?",
help="Specify docker image.")
self._add_parser_common_args(parser)
parser.add_argument(
"--cloud",
default=False,
action=self._get_parser_bool_action(True),
help=
"Create cloud cluster, default is false. Only use when creating new cluster."
)
parser.add_argument(
"--root",
default=False,
action=self._get_parser_bool_action(True),
help=
"Run cluster as root user, default is false, it will run as host user."
)
parser.add_argument(
"--wait-timeout",
type=int,
default=0,
help=
"Specify wait seconds for fe/be ready for service: 0 not wait (default), "\
"> 0 max wait seconds, -1 wait unlimited."
)
group1 = parser.add_argument_group("add new nodes",
"add cluster nodes.")
group1.add_argument(
"--add-fe-num",
type=int,
help=
"Specify add fe num, default: 3 for a new cluster, 0 for a existing cluster."
)
group1.add_argument(
"--add-be-num",
type=int,
help=
"Specify add be num, default: 3 for a new cluster, 0 for a existing cluster."
)
group1.add_argument(
"--add-ms-num",
type=int,
help=
"Specify add ms num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
)
group1.add_argument(
"--add-recycle-num",
type=int,
help=
"Specify add recycle num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
)
group1.add_argument("--fe-config",
nargs="*",
type=str,
help="Specify fe configs for fe.conf. "\
"Example: --fe-config \"enable_debug_points = true\" \"sys_log_level = ERROR\".")
group1.add_argument("--be-config",
nargs="*",
type=str,
help="Specify be configs for be.conf. "\
"Example: --be-config \"enable_debug_points = true\" \"enable_auth = true\".")
group1.add_argument("--ms-config",
nargs="*",
type=str,
help="Specify ms configs for doris_cloud.conf. "\
"Example: --ms-config \"log_level = warn\".")
group1.add_argument("--recycle-config",
nargs="*",
type=str,
help="Specify recycle configs for doris_cloud.conf. "\
"Example: --recycle-config \"log_level = warn\".")
group1.add_argument(
"--fe-follower",
default=False,
action=self._get_parser_bool_action(True),
help=
"The new added fe is follower but not observer. Only support in cloud mode."
)
group1.add_argument("--be-disks",
nargs="*",
default=["HDD=1"],
type=str,
help="Specify each be disks, each group is \"disk_type=disk_num[,disk_capactity]\", "\
"disk_type is HDD or SSD, disk_capactity is capactity limit in gb. default: HDD=1. "\
"Example: --be-disks \"HDD=1\", \"SSD=1,10\", \"SSD=2,100\""\
"means each be has 1 HDD without capactity limit, 1 SSD with 10GB capactity limit, "\
"2 SSD with 100GB capactity limit")
group1.add_argument(
"--be-cluster",
type=str,
help=
"be cluster name, if not specific, will use compute_cluster. Only use in cloud cluster."
)
self._add_parser_ids_args(parser)
group2 = parser.add_mutually_exclusive_group()
if self._support_boolean_action():
group2.add_argument(
"--start",
default=True,
action=self._get_parser_bool_action(False),
help="Start containers, default is true. If specific --no-start, "\
"will create or update config image only but not start containers.")
else:
group2.add_argument(
"--no-start",
dest='start',
default=True,
action=self._get_parser_bool_action(False),
help=
"Create or update config image only and don't start containers."
)
group2.add_argument("--force-recreate",
default=False,
action=self._get_parser_bool_action(True),
help="Recreate containers even if their configuration " \
"and image haven't changed. ")
parser.add_argument(
"--extra-hosts",
nargs="*",
type=str,
help=
"Add custom host-to-IP mappings (host:ip). For example: --extra-hosts myhost1:192.168.10.1 myhost2:192.168.10.2 . Only use when creating new cluster."
)
parser.add_argument(
"--env",
nargs="*",
type=str,
help=
"Add environment variables. For example: --env KEY1=VALUE1 KEY2=VALUE2. Only use when creating new cluster."
)
parser.add_argument("--coverage-dir",
default="",
help="Set code coverage output directory")
parser.add_argument("--sql-mode-node-mgr",
default=False,
action=self._get_parser_bool_action(True),
help="Manager fe be via sql instead of http")
parser.add_argument(
"--remote-master-fe",
type=str,
help=
"Specify remote master fe address with ip:query_port, and all the container use host network. " \
"Only use when creating new cluster."
)
parser.add_argument(
"--local-network-ip",
type=str,
help= "Specify local network ip, no need specify, will auto chose a proper ip. "\
"Only use when creating new cluster and specify --remote-master-fe."
)
parser.add_argument(
"--external-ms",
type=str,
help=
"Use external meta service cluster (specify cluster name). " \
"This cluster will not create its own MS/FDB/Recycler, but use the specified cluster's services. " \
"The external cluster must be a cloud cluster with MS/FDB already running. " \
"Example: --external-ms shared-meta. Only use when creating new cloud cluster."
)
parser.add_argument(
"--instance-id",
type=str,
help=
"Specify instance ID for cloud mode. If not specified, will auto-generate 'default_instance_id'. " \
"When using external MS with multiple clusters, each cluster should have a unique instance ID. " \
"Example: --instance-id prod_instance_1"
)
parser.add_argument(
"--cluster-snapshot",
type=str,
help=
"Cluster snapshot JSON content for FE-1 first startup in cloud mode only. " \
"The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh " \
"with --cluster_snapshot parameter. Only effective on first startup. " \
"Example: --cluster-snapshot '{\"instance_id\":\"instance_id_xxx\"}'"
)
if self._support_boolean_action():
parser.add_argument(
"--be-metaservice-endpoint",
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE meta service endpoint in conf. Default is False."
)
else:
parser.add_argument(
"--no-be-metaservice-endpoint",
dest='be_metaservice_endpoint',
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE meta service endpoint in conf. Default is False."
)
# if default==False, use this style to parser, like --be-cluster-id
parser.add_argument(
"--be-cluster-id",
default=False,
action=self._get_parser_bool_action(True),
help="Do not set BE cluster ID in conf. Default is False.")
parser.add_argument(
"--fdb-version",
type=str,
default="7.1.26",
help="fdb image version. Only use in cloud cluster.")
parser.add_argument(
"--tde-ak",
type=str,
default="",
help="tde ak")
parser.add_argument(
"--tde-sk",
type=str,
default="",
help="tde sk")
# if default==True, use this style to parser, like --detach
if self._support_boolean_action():
parser.add_argument(
"--detach",
default=True,
action=self._get_parser_bool_action(False),
help="Detached mode: Run containers in the background. If specific --no-detach, "\
"will run containers in frontend. ")
else:
parser.add_argument("--no-detach",
dest='detach',
default=True,
action=self._get_parser_bool_action(False),
help="Run containers in frontend. ")
if self._support_boolean_action():
parser.add_argument(
"--reg-be",
default=True,
action=self._get_parser_bool_action(False),
help="Register be to meta server in cloud mode, use for multi clusters test. If specific --no-reg-be, "\
"will not register be to meta server. ")
else:
parser.add_argument(
"--no-reg-be",
dest='reg_be',
default=True,
action=self._get_parser_bool_action(False),
help=
"Don't register be to meta server in cloud mode, use for multi clusters test"
)
def run(self, args):
if not args.NAME:
raise Exception("Need specific not empty cluster name")
for_all = True
add_fdb_num = 0
is_new_cluster = False
try:
cluster = CLUSTER.Cluster.load(args.NAME)
if not cluster.is_cloud:
args.add_ms_num = None
args.add_recycle_num = None
args.ms_id = None
args.recycle_id = None
args.fdb_id = None
if args.fe_id != None or args.be_id != None \
or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \
or args.add_fe_num or args.add_be_num \
or args.add_ms_num or args.add_recycle_num:
for_all = False
except:
# a new cluster
is_new_cluster = True
if not args.IMAGE:
raise Exception("New cluster must specific image") from None
if args.fe_id != None:
args.fe_id = None
LOG.warning(
utils.render_yellow("Ignore --fe-id for new cluster"))
if args.be_id != None:
args.be_id = None
LOG.warning(
utils.render_yellow("Ignore --be-id for new cluster"))
args.fdb_id = None
args.ms_id = None
args.recycle_id = None
if args.add_fe_num is None:
args.add_fe_num = 0 if args.remote_master_fe else 3
if args.add_be_num is None:
args.add_be_num = 3
cloud_store_config = {}
if args.cloud:
external_ms_cluster = getattr(args, 'external_ms', None)
if external_ms_cluster:
# Using the MS nodes from external cluster, no need to add FDB/MS/Recycler
self._validate_external_ms_cluster(external_ms_cluster)
add_fdb_num = 0
args.add_ms_num = 0
args.add_recycle_num = 0
LOG.info(f"Using external MS cluster: {external_ms_cluster}")
else:
add_fdb_num = 1
if not args.add_ms_num:
args.add_ms_num = 1
if not args.add_recycle_num:
args.add_recycle_num = 1
external_ms_cluster = None
if not args.be_cluster:
args.be_cluster = "compute_cluster"
cloud_store_config = self._get_cloud_store_config()
else:
args.add_ms_num = 0
args.add_recycle_num = 0
external_ms_cluster = None
if args.remote_master_fe:
if not args.local_network_ip:
args.local_network_ip = utils.get_local_ip()
parts = args.remote_master_fe.split(":")
if len(parts) != 2:
raise Exception(
f"invalid --remote-master-fe-addr {args.remote_master_fe}, should be 'ip:query_port'"
)
if not parts[0]:
args.remote_master_fe = args.local_network_ip + ":" + parts[
1]
if args.cloud:
args.sql_mode_node_mgr = True
instance_id = getattr(args, 'instance_id', None)
cluster_snapshot = getattr(args, 'cluster_snapshot', '')
cluster = CLUSTER.Cluster.new(
args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
args.be_config, args.ms_config, args.recycle_config,
args.remote_master_fe, args.local_network_ip, args.fe_follower,
args.be_disks, args.be_cluster, args.reg_be, args.extra_hosts, args.env,
args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk,
external_ms_cluster, instance_id, cluster_snapshot)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
if args.be_cluster and cluster.is_cloud:
cluster.be_cluster = args.be_cluster
if cluster.is_cloud:
cluster.fe_follower = args.fe_follower
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id, args.ms_id,
args.recycle_id,
args.fdb_id)
add_fe_ids = []
add_be_ids = []
add_ms_ids = []
add_recycle_ids = []
add_fdb_ids = []
add_type_nums = [
(CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids),
(CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids),
(CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids),
(CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids),
(CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids),
]
if not related_nodes:
related_nodes = []
def do_add_node(node_type, add_num, add_ids):
if not add_num:
return
for i in range(add_num):
node = cluster.add(node_type)
related_nodes.append(node)
add_ids.append(node.id)
for node_type, add_num, add_ids in add_type_nums:
do_add_node(node_type, add_num, add_ids)
if args.IMAGE:
for node in related_nodes:
node.set_image(args.IMAGE)
if for_all:
cluster.set_image(args.IMAGE)
for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB):
node.set_image("foundationdb/foundationdb:{}".format(
args.fdb_version))
cluster.save()
options = []
if not args.start:
options.append("--no-start")
else:
options += ["--remove-orphans"]
if args.detach:
options.append("-d")
if args.force_recreate:
options.append("--force-recreate")
related_node_num = len(related_nodes)
if for_all:
related_node_num = cluster.get_all_nodes_num()
related_nodes = None
output_real_time = args.start and not args.detach
utils.exec_docker_compose_command(cluster.get_compose_file(),
"up",
options,
related_nodes,
output_real_time=output_real_time)
if not args.start:
LOG.info(
utils.render_green(
"Not up cluster cause specific --no-start, related node num {}"
.format(related_node_num)))
else:
LOG.info("Using SQL mode for node management ? {}".format(
cluster.sql_mode_node_mgr))
if cluster.remote_master_fe:
if is_new_cluster:
with open(CLUSTER.get_master_fe_addr_path(cluster.name),
"w") as f:
f.write(cluster.remote_master_fe)
if cluster.is_cloud:
cloud_config = "\n".join([
f"meta_service_endpoint = {cluster.get_meta_server_addr()}",
"deploy_mode = cloud",
f"cluster_id = {CLUSTER.CLUSTER_ID}",
])
# write add conf to remote_master_fe_add.conf, remote fe can send ssh to get this content.
with open(
os.path.join(
CLUSTER.get_status_path(cluster.name),
"remote_master_fe_add.conf"), "w") as f:
f.write(cloud_config)
ans = input(
utils.render_red(
f"\nAdd remote fe {cluster.remote_master_fe} fe.conf with follow config: "
) + "\n\n" + f"{cloud_config}\n\nConfirm ? y/n: ")
if ans != 'y':
LOG.info(
"Up cluster failed due to not confirm write the above config."
)
return
LOG.info("Waiting connect to remote FE...")
expire_ts = time.time() + 3600 * 5
parts = cluster.remote_master_fe.split(":")
fe_ip = parts[0]
fe_port = int(parts[1])
ready = False
while expire_ts > time.time():
if utils.is_socket_avail(fe_ip, fe_port):
ready = True
break
if not ready:
raise Exception(
"Cannot connect to remote master fe: " +
cluster.remote_master_fe)
LOG.info("After connect to remote FE...")
else:
# Wait for FE master to be elected
LOG.info("Waiting for FE master to be elected...")
expire_ts = time.time() + 30
while expire_ts > time.time():
ready = False
db_mgr = database.get_db_mgr(
args.NAME, cluster.get_all_node_net_infos(), False)
for id in add_fe_ids:
fe_state = db_mgr.get_fe(id)
if fe_state is not None and fe_state.alive:
ready = True
break
if ready:
break
LOG.info("there is no fe ready")
time.sleep(1)
LOG.info("after Waiting for FE master to be elected...")
if cluster.is_cloud and cluster.sql_mode_node_mgr:
db_mgr = database.get_db_mgr(args.NAME,
cluster.get_all_node_net_infos(),
False)
master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
cluster.name, True)
# Add FEs except master_fe
for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
fe_querypoint = f"{fe.get_ip()}:{fe.meta['ports']['query_port']}"
fe_endpoint = f"{fe.get_ip()}:{fe.meta['ports']['edit_log_port']}"
if fe_querypoint != master_fe_endpoint:
try:
db_mgr.add_fe(
fe_endpoint, "FOLLOWER"
if cluster.fe_follower else "OBSERVER")
LOG.info(f"Added FE {fe_endpoint} successfully.")
except Exception as e:
LOG.error(
f"Failed to add FE {fe_endpoint}: {str(e)}")
# Add BEs
for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
be_endpoint = f"{be.get_ip()}:{be.meta['ports']['heartbeat_service_port']}"
try:
db_mgr.add_be(be_endpoint)
LOG.info(f"Added BE {be_endpoint} successfully.")
except Exception as e:
LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
if is_new_cluster:
cloud_store_config = self._get_cloud_store_config()
db_mgr.create_default_storage_vault(cloud_store_config)
if not cluster.is_host_network():
wait_service(True, args.wait_timeout, cluster, add_fe_ids,
add_be_ids)
LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
LOG.info(
"Master fe query address: " +
utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
"\n")
return {
"fe": {
"add_list": add_fe_ids,
},
"be": {
"add_list": add_be_ids,
},
"ms": {
"add_list": add_ms_ids,
},
"recycle": {
"add_list": add_recycle_ids,
},
"fdb": {
"add_list": add_fdb_ids,
},
}
def _validate_external_ms_cluster(self, external_ms_cluster_name):
# 1. Is the external cluster exist?
try:
external_cluster = CLUSTER.Cluster.load(external_ms_cluster_name)
except Exception as e:
raise Exception(
f"External MS cluster '{external_ms_cluster_name}' not found. "
f"Please create it first with: "
f"python doris-compose.py up {external_ms_cluster_name} <image> --cloud --add-fe-num 0 --add-be-num 0"
) from e
# 2. Is the external cluster a cloud cluster?
if not external_cluster.is_cloud:
raise Exception(
f"External MS cluster '{external_ms_cluster_name}' is not a cloud cluster. "
f"Only cloud clusters can be used as external MS."
)
# 3. Does the external cluster have MS and FDB nodes?
ms_group = external_cluster.get_group(CLUSTER.Node.TYPE_MS)
fdb_group = external_cluster.get_group(CLUSTER.Node.TYPE_FDB)
if ms_group.get_node_num() == 0:
raise Exception(
f"External MS cluster '{external_ms_cluster_name}' has no MS nodes. "
f"Please add MS nodes first."
)
if fdb_group.get_node_num() == 0:
raise Exception(
f"External MS cluster '{external_ms_cluster_name}' has no FDB nodes. "
f"Please add FDB nodes first."
)
# 4. Are the MS and FDB containers running?
containers = utils.get_doris_running_containers(external_ms_cluster_name)
ms_running = False
fdb_running = False
for container_name in containers.keys():
_, node_type, _ = utils.parse_service_name(container_name)
if node_type == CLUSTER.Node.TYPE_MS:
ms_running = True
elif node_type == CLUSTER.Node.TYPE_FDB:
fdb_running = True
if not ms_running:
raise Exception(
f"External MS cluster '{external_ms_cluster_name}' MS node is not running. "
f"Please start it with: python doris-compose.py start {external_ms_cluster_name}"
)
if not fdb_running:
raise Exception(
f"External MS cluster '{external_ms_cluster_name}' FDB node is not running. "
f"Please start it with: python doris-compose.py start {external_ms_cluster_name}"
)
LOG.info(utils.render_green(
f"✓ External MS cluster '{external_ms_cluster_name}' validation passed: "
f"MS={external_cluster.get_meta_server_addr()}, "
f"FDB={external_cluster.get_fdb_cluster()}"
))
def _get_cloud_store_config(self):
example_cfg_file = os.path.join(CLUSTER.LOCAL_RESOURCE_PATH,
"cloud.ini.example")
if not CLUSTER.CLOUD_CFG_FILE:
raise Exception("Cloud cluster need S3 store, specific its config in a file.\n" \
"A example file is " + example_cfg_file + ".\n" \
"Then setting the env variable `export DORIS_CLOUD_CFG_FILE=<cfg-file-path>`.")
if not os.path.exists(CLUSTER.CLOUD_CFG_FILE):
raise Exception("Cloud store config file '" +
CLUSTER.CLOUD_CFG_FILE + "' not exists.")
config = {}
with open(example_cfg_file, "r") as f:
for line in f.readlines():
if line.startswith('#'):
continue
pos = line.find('=')
if pos <= 0:
continue
key = line[0:pos].strip()
if key:
config[key] = ""
with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
for line in f.readlines():
if line.startswith('#'):
continue
pos = line.find('=')
if pos <= 0:
continue
key = line[0:pos].strip()
if key and config.get(key, None) != None:
config[key] = line[line.find('=') + 1:].strip()
for key, value in config.items():
if not value:
raise Exception(
"Should provide none empty property '{}' in file {}".
format(key, CLUSTER.CLOUD_CFG_FILE))
return config
class DownCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser("down",
help="Down doris containers, networks. "\
"It will also remove node from DB. " \
"If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
"then apply to all containers.")
parser.add_argument("NAME", help="Specify cluster name")
self._add_parser_ids_args(parser)
self._add_parser_common_args(parser)
parser.add_argument(
"--clean",
default=False,
action=self._get_parser_bool_action(True),
help=
"Clean container related files, include expose data, config and logs."
)
parser.add_argument(
"--drop-force",
default=None,
action=self._get_parser_bool_action(True),
help="Drop doris node force. For be, if specific --drop-force, "\
"it will send dropp to fe, otherwise send decommission to fe.")
def run(self, args):
cluster_name = args.NAME
cluster = None
stop_grace = False
try:
cluster = CLUSTER.Cluster.load(cluster_name)
for_all, related_nodes, related_node_num = get_ids_related_nodes(
cluster,
args.fe_id,
args.be_id,
args.ms_id,
args.recycle_id,
args.fdb_id,
ignore_not_exists=True)
stop_grace = cluster.coverage_dir
except Exception as e:
for_all = not args.fe_id and not args.be_id and not args.ms_id and not args.recycle_id
related_nodes = []
related_node_num = 0
if not for_all:
raise e
LOG.info("down cluster " + args.NAME + " for all " +
str(for_all).lower())
if for_all:
compose_file = CLUSTER.get_compose_file(cluster_name)
if os.path.exists(compose_file):
try:
options = ["-v", "--remove-orphans"]
if not stop_grace:
options.extend(["-t", "1"])
utils.exec_docker_compose_command(compose_file,
"down",
options=options)
except Exception as e:
LOG.warning("down cluster has exception: " + str(e))
try:
utils.remove_docker_network(cluster_name)
except Exception as e:
LOG.warning("remove network has exception: " + str(e))
if args.clean:
cluster_path = CLUSTER.get_cluster_path(cluster_name)
if os.path.exists(cluster_path):
utils.enable_dir_with_rw_perm(cluster_path)
shutil.rmtree(cluster_path)
LOG.info(
utils.render_yellow(
"Clean cluster data cause has specific --clean"))
else:
db_mgr = database.get_db_mgr(cluster.name,
cluster.get_all_node_net_infos())
for node in related_nodes:
if node.is_fe():
fe_endpoint = "{}:{}".format(
node.get_ip(), node.meta["ports"]["edit_log_port"])
db_mgr.drop_fe(fe_endpoint)
elif node.is_be():
be_endpoint = "{}:{}".format(
node.get_ip(),
node.meta["ports"]["heartbeat_service_port"])
if args.drop_force:
db_mgr.drop_be(be_endpoint)
else:
db_mgr.decommission_be(be_endpoint)
else:
raise Exception("Unknown node type: {}".format(
node.node_type()))
#utils.exec_docker_compose_command(cluster.get_compose_file(),
# "stop",
# nodes=[node])
utils.exec_docker_compose_command(cluster.get_compose_file(),
"rm", ["-s", "-v", "-f"],
nodes=[node])
if args.clean:
utils.enable_dir_with_rw_perm(node.get_path())
shutil.rmtree(node.get_path())
register_file = "{}/{}-{}-register".format(
CLUSTER.get_status_path(cluster.name),
node.node_type(), node.id)
if os.path.exists(register_file):
os.remove(register_file)
LOG.info(
utils.render_yellow(
"Clean {} with id {} data cause has specific --clean"
.format(node.node_type(), node.id)))
cluster.remove(node.node_type(), node.id)
cluster.save()
LOG.info(
utils.render_green(
"Down cluster {} succ, related node num {}".format(
cluster_name, related_node_num)))
return "down cluster succ"
class ListNode(object):
def __init__(self):
self.node_type = ""
self.id = 0
self.backend_id = ""
self.cluster_name = ""
self.ip = ""
self.status = ""
self.container_id = ""
self.image = ""
self.created = ""
self.alive = ""
self.is_master = ""
self.tablet_num = ""
self.last_heartbeat = ""
self.err_msg = ""
self.query_port = ""
self.http_port = ""
self.heartbeat_port = ""
self.edit_log_port = ""
self.heartbeat_port = ""
def info(self, detail):
result = [
self.cluster_name, "{}-{}".format(self.node_type, self.id),
self.ip, self.status, self.container_id, self.image, self.created,
self.alive, self.is_master, self.backend_id, self.tablet_num,
self.last_heartbeat, self.err_msg
]
if detail:
node_path = CLUSTER.get_node_path(self.cluster_name,
self.node_type, self.id)
result += [
self.query_port, self.http_port, node_path, self.edit_log_port,
self.heartbeat_port
]
return result
def update_db_info(self, cluster, db_mgr):
try:
node = cluster.get_node(self.node_type, self.id, True)
except:
node = None
ports = node.meta["ports"] if node else {}
if self.node_type == CLUSTER.Node.TYPE_FE:
fe = db_mgr.get_fe(self.id)
if fe:
self.alive = str(fe.alive).lower()
self.is_master = str(fe.is_master).lower()
self.last_heartbeat = fe.last_heartbeat
self.err_msg = fe.err_msg
self.query_port = fe.query_port
self.http_port = fe.http_port
self.edit_log_port = fe.edit_log_port
elif self.node_type == CLUSTER.Node.TYPE_BE:
self.backend_id = -1
be = db_mgr.get_be(self.id)
if be:
self.alive = str(be.alive).lower()
self.backend_id = be.backend_id
self.tablet_num = be.tablet_num
self.last_heartbeat = be.last_heartbeat
self.err_msg = be.err_msg
self.http_port = be.http_port
self.heartbeat_port = be.heartbeat_service_port
elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE:
if ports:
self.http_port = ports.get("brpc_listen_port", -1)
if node and node.meta.get("is_remote", False):
self.ip = node.get_ip()
self.container_id = "<remote>"
self.image = "<remote>"
class GenConfCommand(Command):
def print_use_time(self):
return False
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"config",
help="Generate regression-conf-custom.groovy for regression test.")
parser.add_argument("NAME", default="", help="Specific cluster name.")
parser.add_argument("DORIS_ROOT_PATH", default="", help="Specify doris or selectdb root path, "\
"i.e. the parent directory of regression-test.")
parser.add_argument("--connect-follow-fe",
default=False,
action=self._get_parser_bool_action(True),
help="Connect to follow fe.")
parser.add_argument("-q",
"--quiet",
default=False,
action=self._get_parser_bool_action(True),
help="write config quiet, no need confirm.")
return parser
def run(self, args):
base_conf = '''
jdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true"
feSourceThriftAddress = "{fe_ip}:{rpc_port}"
feTargetThriftAddress = "{fe_ip}:{rpc_port}"
syncerAddress = "{fe_ip}:9190"
feHttpAddress = "{fe_ip}:{http_port}"
'''
cloud_conf = '''
feCloudHttpAddress = "{fe_ip}:18030"
metaServiceHttpAddress = "{ms_endpoint}"
metaServiceToken = "greedisgood9999"
recycleServiceHttpAddress = "{recycle_endpoint}"
instanceId = "12345678"
multiClusterInstance = "12345678"
multiClusterBes = "{multi_cluster_bes}"
cloudUniqueId= "{fe_cloud_unique_id}"
'''
cluster = CLUSTER.Cluster.load(args.NAME)
master_fe_ip_ep = CLUSTER.get_master_fe_endpoint(args.NAME)
if not master_fe_ip_ep:
print("Not found cluster with name {} in directory {}".format(
args.NAME, CLUSTER.LOCAL_DORIS_PATH))
return
db_mgr = database.get_db_mgr(args.NAME,
cluster.get_all_node_net_infos(), False)
fe_ip = ""
rpc_port = 0
query_port = 0
http_port = 0
for fe in db_mgr.get_all_fe().values():
if not fe.alive:
continue
if fe.is_master == (not args.connect_follow_fe):
fe_ip = fe.ip
rpc_port = fe.rpc_port
query_port = fe.query_port
http_port = fe.http_port
break
if not fe_ip:
role = "follower" if args.connect_follow_fe else "master"
raise Exception(f"Not found an alive {role} fe")
relative_custom_file_path = "regression-test/conf/regression-conf-custom.groovy"
regression_conf_custom = os.path.join(args.DORIS_ROOT_PATH,
relative_custom_file_path)
if not args.quiet:
ans = input(
"\nwrite file {} ? y/n: ".format(regression_conf_custom))
if ans != 'y':
print("\nNo write regression custom file.")
return
annotation_start = "//---------- Start auto generate by doris-compose.py---------"
annotation_end = "//---------- End auto generate by doris-compose.py---------"
old_contents = []
if os.path.exists(regression_conf_custom):
with open(regression_conf_custom, "r") as f:
old_contents = f.readlines()
with open(regression_conf_custom, "w") as f:
# write auto gen config
f.write(annotation_start)
f.write(
base_conf.format(fe_ip=fe_ip,
rpc_port=rpc_port,
query_port=query_port,
http_port=http_port))
if cluster.is_cloud:
multi_cluster_bes = ",".join([
"{}:{}:{}:{}:{}".format(
be.get_ip(),
be.meta["ports"]["heartbeat_service_port"],
be.meta["ports"]["webserver_port"],
be.cloud_unique_id(), be.meta["ports"]["brpc_port"])
for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE)
])
master_fe = cluster.get_remote_fe_node(
) if cluster.is_host_network() else cluster.get_node(
CLUSTER.Node.TYPE_FE, 1)
f.write(
cloud_conf.format(
fe_ip=fe_ip,
ms_endpoint=cluster.get_meta_server_addr(),
recycle_endpoint=cluster.get_recycle_addr(),
multi_cluster_bes=multi_cluster_bes,
fe_cloud_unique_id=master_fe.cloud_unique_id()))
f.write(annotation_end + "\n\n")
# write not-auto gen config
in_annotation = False
annotation_end_line_idx = -100
for line_idx, line in enumerate(old_contents):
line = line.rstrip()
if line == annotation_start:
in_annotation = True
elif line == annotation_end:
in_annotation = False
annotation_end_line_idx = line_idx
elif not in_annotation:
if line or line_idx != annotation_end_line_idx + 1:
f.write(line + "\n")
print("\nWrite succ: " + regression_conf_custom)
class ListCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"ls", help="List running doris compose clusters.")
parser.add_argument(
"NAME",
nargs="*",
help=
"Specify multiple clusters, if specific, show all their containers."
)
self._add_parser_common_args(parser)
parser.add_argument("--detail",
default=False,
action=self._get_parser_bool_action(True),
help="Print more detail fields.")
def _hint_cluster_bad(self, cluster_name):
cluster_path = CLUSTER.get_cluster_path(cluster_name)
if not os.path.exists(cluster_path):
LOG.info(
utils.render_yellow(
f"Not exits cluster directory in '{CLUSTER.LOCAL_DORIS_PATH}'"
))
elif not os.path.exists(CLUSTER.Cluster._get_meta_file(cluster_name)):
LOG.error(
utils.render_red(
f"Not exits cluster meta file in '{cluster_path}'"))
else:
try:
CLUSTER.Cluster.load(cluster_name)
except:
LOG.error(utils.render_red("meta file is bad or incompatible with current doris-compose.py. " \
"Run command `down --clean` to destroy it then recreate a new one"))
def run(self, args):
COMPOSE_MISSING = "(missing)"
COMPOSE_BAD = "(bad)"
COMPOSE_GOOD = ""
SERVICE_DEAD = "dead"
SERVICE_RUNNING = "running"
class ComposeService(object):
def __init__(self, name, ip, image):
self.name = name
self.ip = ip
self.image = image
def parse_cluster_compose_file(cluster_name):
compose_file = CLUSTER.get_compose_file(cluster_name)
try:
cluster = CLUSTER.Cluster.load(cluster_name)
ip_for_host_mode = cluster.local_network_ip if cluster.is_host_network(
) else ""
except:
ip_for_host_mode = ""
if not os.path.exists(compose_file):
return COMPOSE_MISSING, {}
try:
compose = utils.read_compose_file(compose_file)
if not compose:
return COMPOSE_BAD, {}
services = compose.get("services", {})
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
service: ComposeService(
service, ip_for_host_mode if ip_for_host_mode else
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
for service, service_conf in services.items()
}
except:
return COMPOSE_BAD, {}
clusters = {}
search_names = []
if args.NAME:
search_names = args.NAME
else:
search_names = CLUSTER.get_all_cluster_names()
for cluster_name in search_names:
status, services = parse_cluster_compose_file(cluster_name)
clusters[cluster_name] = {"status": status, "services": services}
docker_clusters = utils.get_doris_containers(args.NAME)
for cluster_name, containers in docker_clusters.items():
cluster_info = clusters.get(cluster_name, None)
if not cluster_info:
cluster_info = {"status": COMPOSE_MISSING, "services": {}}
clusters[cluster_name] = cluster_info
for container in containers:
#if container.status == "running" and cluster_info[
# "status"] == COMPOSE_GOOD and (
# container.name not in cluster_info["services"]):
# container.status = "orphans"
cluster_info["services"][container.name] = container
TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
if not args.NAME:
header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD",
"NETWORK MODE", "CONFIG FILES")
rows = []
for name in sorted(clusters.keys()):
cluster_info = clusters[name]
service_statuses = {}
for _, container in cluster_info["services"].items():
status = SERVICE_DEAD if type(
container) == TYPE_COMPOSESERVICE else container.status
service_statuses[status] = service_statuses.get(status,
0) + 1
show_status = ",".join([
"{}({})".format(status, count)
for status, count in service_statuses.items()
])
owner = utils.get_path_owner(CLUSTER.get_cluster_path(name))
compose_file = CLUSTER.get_compose_file(name)
is_cloud = ""
network_mode = ""
try:
cluster = CLUSTER.Cluster.load(name)
is_cloud = "true" if cluster.is_cloud else "false"
network_mode = "host" if cluster.is_host_network(
) else "bridge"
except:
pass
rows.append(
(name, owner, show_status,
CLUSTER.get_master_fe_endpoint(name), is_cloud,
network_mode, "{}{}".format(compose_file,
cluster_info["status"])))
return self._print_table(header, rows)
header = [
"CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE",
"CREATED", "alive", "is_master", "backend_id", "tablet_num",
"last_heartbeat", "err_msg"
]
if args.detail:
header += [
"query_port",
"http_port",
"path",
"edit_log_port",
"heartbeat_port",
]
rows = []
for cluster_name in sorted(clusters.keys()):
fe_ids = {}
be_ids = {}
services = clusters[cluster_name]["services"]
cluster = None
try:
cluster = CLUSTER.Cluster.load(cluster_name)
except:
pass
db_mgr = database.get_db_mgr(
cluster_name,
cluster.get_all_node_net_infos() if cluster else [], False)
nodes = []
for service_name, container in services.items():
_, node_type, id = utils.parse_service_name(container.name)
node = ListNode()
node.cluster_name = cluster_name
node.node_type = node_type
node.id = id
node.update_db_info(cluster, db_mgr)
nodes.append(node)
if node_type == CLUSTER.Node.TYPE_FE:
fe_ids[id] = True
elif node_type == CLUSTER.Node.TYPE_BE:
be_ids[id] = True
if type(container) == TYPE_COMPOSESERVICE:
node.ip = container.ip
node.image = container.image
node.status = SERVICE_DEAD
else:
node.created = dateutil.parser.parse(
container.attrs.get("Created")).astimezone().strftime(
"%Y-%m-%d %H:%M:%S")
if cluster and cluster.is_host_network():
node.ip = cluster.local_network_ip
else:
network_name = utils.get_network_name(cluster.name)
node.ip = container.attrs["NetworkSettings"]["Networks"][network_name] \
["IPAMConfig"]["IPv4Address"]
node.image = container.attrs["Config"]["Image"]
if not node.image:
node.image = ",".join(container.image.tags)
node.container_id = container.short_id
node.status = container.status
if node.container_id and \
node_type in (CLUSTER.Node.TYPE_FDB,
CLUSTER.Node.TYPE_MS,
CLUSTER.Node.TYPE_RECYCLE):
node.alive = "true"
for id, fe in db_mgr.fe_states.items():
if fe_ids.get(id, False):
continue
node = ListNode()
node.cluster_name = cluster_name
node.node_type = CLUSTER.Node.TYPE_FE
node.id = id
node.status = SERVICE_RUNNING if fe.alive else SERVICE_DEAD
node.update_db_info(cluster, db_mgr)
nodes.append(node)
for id, be in db_mgr.be_states.items():
if be_ids.get(id, False):
continue
node = ListNode()
node.cluster_name = cluster_name
node.node_type = CLUSTER.Node.TYPE_BE
node.id = id
node.status = SERVICE_RUNNING if be.alive else SERVICE_DEAD
node.update_db_info(cluster, db_mgr)
nodes.append(node)
def get_node_seq(node):
return CLUSTER.get_node_seq(node.node_type, node.id)
for node in sorted(nodes, key=get_node_seq):
rows.append(node.info(args.detail))
ret = self._print_table(header, rows)
if len(args.NAME) == 1 and len(rows) == 0:
self._hint_cluster_bad(args.NAME[0])
return ret
class InfoCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"info", help="Show info like cloud.ini, port, path, etc")
self._add_parser_common_args(parser)
def run(self, args):
header = ["key", "value", "scope"]
cloud_cfg_file_env = os.getenv("DORIS_CLOUD_CFG_FILE")
cloud_cfg_file = cloud_cfg_file_env if cloud_cfg_file_env else "${LOCAL_DORIS_PATH}/cloud.ini"
rows = [
("LOCAL_DORIS_PATH", CLUSTER.LOCAL_DORIS_PATH, "env variable"),
("DORIS_CLOUD_CFG_FILE", cloud_cfg_file, "env variable"),
("FE_QUERY_PORT", CLUSTER.FE_QUERY_PORT, "constant"),
("FE_HTTP_PORT", CLUSTER.FE_HTTP_PORT, "constant"),
("FE_EDITLOG_PORT", CLUSTER.FE_EDITLOG_PORT, "constant"),
("FE_JAVA_DBG_PORT", CLUSTER.FE_JAVA_DBG_PORT, "constant"),
("BE_HEARTBEAT_PORT", CLUSTER.BE_HEARTBEAT_PORT, "constant"),
("BE_WEBSVR_PORT", CLUSTER.BE_WEBSVR_PORT, "constant"),
("MS_PORT", CLUSTER.MS_PORT, "constant"),
("RECYCLER_PORT", CLUSTER.MS_PORT, "constant"),
]
if os.path.exists(CLUSTER.CLOUD_CFG_FILE):
with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
rows.append((key.strip(), value.strip(), "cloud.ini"))
return self._print_table(header, rows)
class AddRWPermCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"add-rw-perm",
help="Add read and write permissions to the cluster files")
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_common_args(parser)
def run(self, args):
utils.enable_dir_with_rw_perm(CLUSTER.get_cluster_path(args.NAME))
return ""
class RollbackCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"rollback",
help="Rollback cloud cluster to a snapshot. " \
"Stop ALL FE/BE, clean metadata/data, and restart with new cloud_unique_id and instance_id. " \
"Only available for cloud mode clusters."
)
parser.add_argument("NAME", help="Specify cluster name.")
parser.add_argument(
"--cluster-snapshot",
type=str,
required=True,
help="Cluster snapshot JSON content for rollback. " \
"Example: '{\"instance_id\":\"instance_id_xxx\"}'"
)
parser.add_argument(
"--instance-id",
type=str,
help="New instance ID for the cluster after rollback. " \
"If not specified, will generate a new one based on timestamp."
)
parser.add_argument(
"--wait-timeout",
type=int,
default=0,
help="Specify wait seconds for fe/be ready for service: 0 not wait (default), " \
"> 0 max wait seconds, -1 wait unlimited."
)
self._add_parser_common_args(parser)
return parser
def _update_config_cloud_unique_id(self, conf_path, new_cloud_unique_id):
"""Update cloud_unique_id in fe.conf or be.conf file.
This method updates the cloud_unique_id value in the config file.
If the line exists, it will be replaced; if not found, it will be added.
"""
updated = False
lines = []
# Read existing config
with open(conf_path, "r") as f:
lines = f.readlines()
# Try to update existing cloud_unique_id line
for i, line in enumerate(lines):
stripped = line.strip()
# Match lines like: cloud_unique_id = xxx or cloud_unique_id=xxx
if stripped.startswith("cloud_unique_id") and ("=" in stripped):
lines[i] = f"cloud_unique_id = {new_cloud_unique_id}\n"
updated = True
break
# If not found, add it to the doris-compose section
if not updated:
raise Exception("cloud_unique_id not found in config file: " + conf_path)
# Write back to file
with open(conf_path, "w") as f:
f.writelines(lines)
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
# Validate: only cloud clusters support rollback
if not cluster.is_cloud:
raise Exception("Rollback is only supported for cloud clusters")
# Rollback must include ALL FE/BE nodes
fe_nodes = cluster.get_all_nodes(CLUSTER.Node.TYPE_FE)
be_nodes = cluster.get_all_nodes(CLUSTER.Node.TYPE_BE)
if not fe_nodes and not be_nodes:
raise Exception("No FE or BE nodes to rollback")
# Generate new instance_id and rollback timestamp
rollback_ts = str(int(time.time()))
new_instance_id = args.instance_id or f"instance_{cluster.name}_{rollback_ts}"
LOG.info(f"Starting rollback with instance_id: {new_instance_id}, timestamp: {rollback_ts}")
# Step 1: Stop FE/BE nodes
LOG.info("Step 1/5: Stopping FE/BE nodes...")
fe_ids = [node.id for node in fe_nodes]
be_ids = [node.id for node in be_nodes]
stop_nodes = fe_nodes + be_nodes
utils.exec_docker_compose_command(
cluster.get_compose_file(),
"stop",
options=["-t", "1"],
nodes=stop_nodes
)
LOG.info(f"Stopped {len(fe_nodes)} FE and {len(be_nodes)} BE nodes")
cluster.is_rollback = True # Add ROLLBACK envs
# Step 2: Clean metadata and data
LOG.info("Step 2/5: Cleaning metadata and data directories...")
for fe in fe_nodes:
fe_meta_path = os.path.join(fe.get_path(), "doris-meta")
if os.path.exists(fe_meta_path):
utils.enable_dir_with_rw_perm(fe_meta_path)
shutil.rmtree(fe_meta_path)
os.makedirs(fe_meta_path, exist_ok=True)
LOG.info(f" Cleaned FE-{fe.id} doris-meta/")
for be in be_nodes:
be_storage_path = os.path.join(be.get_path(), "storage")
if os.path.exists(be_storage_path):
utils.enable_dir_with_rw_perm(be_storage_path)
shutil.rmtree(be_storage_path)
# Recreate storage directories based on disk configuration
be.init_disk(cluster.be_disks)
LOG.info(f" Cleaned BE-{be.id} storage/")
# Step 3: Update cluster configuration
LOG.info("Step 3/5: Updating cluster configuration...")
# Update instance_id
old_instance_id = cluster.instance_id
cluster.instance_id = new_instance_id
LOG.info(f" Updated instance_id: {old_instance_id} -> {new_instance_id}")
# Update cluster_snapshot for FE-1
cluster.cluster_snapshot = args.cluster_snapshot
fe1 = cluster.get_node(CLUSTER.Node.TYPE_FE, 1)
snapshot_file = os.path.join(fe1.get_path(), "conf", "cluster_snapshot.json")
with open(snapshot_file, "w") as f:
f.write(args.cluster_snapshot)
LOG.info(f" Written cluster_snapshot to {snapshot_file}")
# Step 4: Update cloud_unique_id by setting rollback_timestamp in node meta
LOG.info("Step 4/5: Generating new cloud_unique_id...")
for fe in fe_nodes:
fe.meta["rollback_timestamp"] = rollback_ts
old_id = f"{cluster.name}_sql_server_{fe.id}"
new_id = fe.cloud_unique_id()
LOG.info(f" FE-{fe.id}: {old_id} -> {new_id}")
fe_conf_path = os.path.join(fe.get_path(), "conf", "fe.conf")
if os.path.exists(fe_conf_path):
self._update_config_cloud_unique_id(fe_conf_path, new_id)
for be in be_nodes:
be.meta["rollback_timestamp"] = rollback_ts
old_id = f"{cluster.name}_compute_node_{be.id}"
new_id = be.cloud_unique_id()
LOG.info(f" BE-{be.id}: {old_id} -> {new_id}")
be_conf_path = os.path.join(be.get_path(), "conf", "be.conf")
if os.path.exists(be_conf_path):
self._update_config_cloud_unique_id(be_conf_path, new_id)
# Save updated cluster configuration
cluster.save()
LOG.info(" Saved cluster configuration")
# Step 5: Start FE/BE nodes
LOG.info("Step 5/5: Starting FE/BE nodes with new configuration...")
utils.exec_docker_compose_command(
cluster.get_compose_file(),
"up",
options=["-d"],
nodes=stop_nodes
)
# Wait for services to be ready
if not cluster.is_host_network():
wait_service(True, args.wait_timeout, cluster, fe_ids, be_ids)
LOG.info("Rollback completed successfully.")
return "Rollback completed successfully."
ALL_COMMANDS = [
UpCommand("up"),
DownCommand("down"),
StartCommand("start"),
StopCommand("stop"),
RestartCommand("restart"),
SimpleCommand("pause", "Pause the doris containers. "),
SimpleCommand("unpause", "Unpause the doris containers. "),
GenConfCommand("config"),
InfoCommand("info"),
ListCommand("ls"),
AddRWPermCommand("add-rw-perm"),
RollbackCommand("rollback"),
]