blob: 53bd9c92785cb5c8797df1a62a5e520260f61269 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import cluster as CLUSTER
import os.path
import pymysql
import time
import utils
import uuid
LOG = utils.get_logger()
class FEState(object):
def __init__(self, id, ip, is_master, alive, last_heartbeat, err_msg, role,
query_port, rpc_port, http_port, edit_log_port):
self.id = id
self.ip = ip
self.is_master = is_master
self.alive = alive
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
self.role = role
self.query_port = query_port
self.rpc_port = rpc_port
self.http_port = http_port
self.edit_log_port = edit_log_port
class BEState(object):
def __init__(self, id, ip, backend_id, decommissioned, alive, tablet_num,
last_heartbeat, err_msg, http_port, heartbeat_service_port):
self.id = id
self.ip = ip
self.backend_id = backend_id
self.decommissioned = decommissioned
self.alive = alive
self.tablet_num = tablet_num
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
self.http_port = http_port
self.heartbeat_service_port = heartbeat_service_port
class DBManager(object):
def __init__(self, all_node_net_infos):
self.all_node_net_infos = all_node_net_infos
self.fe_states = {}
self.be_states = {}
self.conn = None
self.fe_ip = ""
self.fe_port = -1
def get_all_fe(self):
return self.fe_states
def get_fe(self, id):
return self.fe_states.get(id, None)
def get_be(self, id):
return self.be_states.get(id, None)
def load_states(self):
self._load_fe_states()
self._load_be_states()
def add_fe(self, fe_endpoint, role):
try:
sql = f"ALTER SYSTEM ADD {role} '{fe_endpoint}'"
self._exec_query(sql)
LOG.info(f"Added {role} FE {fe_endpoint} via SQL successfully.")
except Exception as e:
LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}")
raise
def drop_fe(self, fe_endpoint):
id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
try:
role = self.get_fe(id).role if self.get_fe(id) else "FOLLOWER"
self._exec_query("ALTER SYSTEM DROP {} '{}'".format(
role, fe_endpoint))
LOG.info("Drop fe {} with id {} from db succ.".format(
fe_endpoint, id))
except Exception as e:
if str(e).find("frontend does not exist") >= 0:
LOG.info(
"Drop fe {} with id {} from db succ cause it does not exist in db."
.format(fe_endpoint, id))
return
raise e
def add_be(self, be_endpoint):
try:
sql = f"ALTER SYSTEM ADD BACKEND '{be_endpoint}'"
self._exec_query(sql)
LOG.info(f"Added BE {be_endpoint} via SQL successfully.")
except Exception as e:
LOG.error(f"Failed to add BE {be_endpoint} via SQL: {str(e)}")
raise
def drop_be(self, be_endpoint):
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
try:
self._exec_query(
"ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint))
LOG.info("Drop be {} with id {} from db succ.".format(
be_endpoint, id))
except Exception as e:
if str(e).find("backend does not exists") >= 0:
LOG.info(
"Drop be {} with id {} from db succ cause it does not exist in db."
.format(be_endpoint, id))
return
raise e
def decommission_be(self, be_endpoint):
old_tablet_num = 0
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
start_ts = time.time()
if id not in self.be_states:
self._load_be_states()
if id in self.be_states:
be = self.be_states[id]
old_tablet_num = be.tablet_num
if not be.alive:
raise Exception("Decommission be {} with id {} fail " \
"cause it's not alive, maybe you should specific --drop-force " \
" to dropp it from db".format(be_endpoint, id))
try:
self._exec_query(
"ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint))
LOG.info("Mark be {} with id {} as decommissioned, start migrate its tablets, " \
"wait migrating job finish.".format(be_endpoint, id))
except Exception as e:
if str(e).find("Backend does not exist") >= 0:
LOG.info("Decommission be {} with id {} from db succ " \
"cause it does not exist in db.".format(be_endpoint, id))
return
raise e
while True:
self._load_be_states()
be = self.be_states.get(id, None)
if not be:
LOG.info("Decommission be {} succ, total migrate {} tablets, " \
"has drop it from db.".format(be_endpoint, old_tablet_num))
return
LOG.info(
"Decommission be {} status: alive {}, decommissioned {}. " \
"It is migrating its tablets, left {}/{} tablets. Time elapse {} s."
.format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num,
int(time.time() - start_ts)))
time.sleep(1)
def create_default_storage_vault(self, cloud_store_config):
try:
# Create storage vault
create_vault_sql = f"""
CREATE STORAGE VAULT IF NOT EXISTS default_vault
PROPERTIES (
"type" = "S3",
"s3.access_key" = "{cloud_store_config['DORIS_CLOUD_AK']}",
"s3.secret_key" = "{cloud_store_config['DORIS_CLOUD_SK']}",
"s3.endpoint" = "{cloud_store_config['DORIS_CLOUD_ENDPOINT']}",
"s3.bucket" = "{cloud_store_config['DORIS_CLOUD_BUCKET']}",
"s3.region" = "{cloud_store_config['DORIS_CLOUD_REGION']}",
"s3.root.path" = "{str(uuid.uuid4())}",
"provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}",
"use_path_style" = "false"
);
"""
# create hk storage vault from beijing cost 14s
self._reset_conn(read_timeout=20)
self._exec_query(create_vault_sql)
LOG.info("Created storage vault 'default_vault'")
# Set as default storage vault
set_default_vault_sql = "SET default_vault as DEFAULT STORAGE VAULT;"
self._exec_query(set_default_vault_sql)
LOG.info("Set 'default_vault' as the default storage vault")
except Exception as e:
LOG.error(f"Failed to create default storage vault: {str(e)}")
raise
def _load_fe_states(self):
fe_states = {}
alive_master_fe_ip = None
alive_master_fe_port = None
for record in self._exec_query("show frontends"):
name = record["Name"]
ip = record["Host"]
role = record["Role"]
is_master = utils.is_true(record["IsMaster"])
alive = utils.is_true(record["Alive"])
query_port = int(record["QueryPort"])
http_port = int(record["HttpPort"])
rpc_port = int(record["RpcPort"])
edit_log_port = int(record["EditLogPort"])
id = None
for net_info in self.all_node_net_infos:
if net_info.ip == ip and net_info.ports.get("query_port",
-1) == query_port:
id = net_info.id
break
if not id:
id = CLUSTER.Node.get_id_from_ip(ip)
last_heartbeat = utils.escape_null(record["LastHeartbeat"])
err_msg = record["ErrMsg"]
fe = FEState(id, ip, is_master, alive, last_heartbeat, err_msg,
role, query_port, rpc_port, http_port, edit_log_port)
fe_states[id] = fe
if is_master and alive:
alive_master_fe_ip = ip
alive_master_fe_port = query_port
LOG.debug(
"record of show frontends, name {}, ip {}, port {}, alive {}, is_master {}, role {}"
.format(name, ip, query_port, alive, is_master, role))
self.fe_states = fe_states
if alive_master_fe_ip and (alive_master_fe_ip != self.fe_ip
or alive_master_fe_port != self.fe_port):
self.fe_ip = alive_master_fe_ip
self.fe_port = alive_master_fe_port
self._reset_conn()
def _load_be_states(self):
be_states = {}
for record in self._exec_query("show backends"):
backend_id = int(record["BackendId"])
alive = utils.is_true(record["Alive"])
decommissioned = utils.is_true(record["SystemDecommissioned"])
tablet_num = int(record["TabletNum"])
ip = record["Host"]
heartbeat_service_port = int(record["HeartbeatPort"])
http_port = int(record["HttpPort"])
id = None
for net_info in self.all_node_net_infos:
if net_info.ip == ip and net_info.ports.get(
"heartbeat_service_port",
-1) == heartbeat_service_port:
id = net_info.id
break
if not id:
id = CLUSTER.Node.get_id_from_ip(ip)
last_heartbeat = utils.escape_null(record["LastHeartbeat"])
err_msg = record["ErrMsg"]
be = BEState(id, ip, backend_id, decommissioned, alive, tablet_num,
last_heartbeat, err_msg, http_port,
heartbeat_service_port)
be_states[id] = be
self.be_states = be_states
# return rows, and each row is a record map
def _exec_query(self, sql, retries=3):
self._prepare_conn()
for attempt in range(retries):
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
fields = [field_md[0] for field_md in cursor.description
] if cursor.description else []
return [
dict(zip(fields, row)) for row in cursor.fetchall()
]
except Exception as e:
LOG.warning(
f"Error occurred: fe {self.fe_ip}:{self.fe_port}, sql `{sql}`, err {e}"
)
if "timed out" in str(e).lower() and attempt < retries - 1:
LOG.warning(
f"Query timed out, fe {self.fe_ip}:{self.fe_port}. Retrying {attempt + 1}/{retries}..."
)
self._reset_conn()
else:
raise e
raise Exception("Max retries exceeded")
def _prepare_conn(self):
if self.conn:
return
self._reset_conn()
def _reset_conn(self, read_timeout=10, connect_timeout=3):
self.conn = pymysql.connect(user="root",
host=self.fe_ip,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
port=self.fe_port)
def get_db_mgr(cluster_name, all_node_net_infos, required_load_succ=True):
assert cluster_name
db_mgr = DBManager(all_node_net_infos)
master_fe_query_addr_file = CLUSTER.get_master_fe_addr_path(cluster_name)
master_fe_query_addr = None
if os.path.exists(master_fe_query_addr_file):
with open(master_fe_query_addr_file, "r") as f:
master_fe_query_addr = f.read().strip()
if not master_fe_query_addr:
return db_mgr
pos = master_fe_query_addr.find(':')
master_fe_ip = master_fe_query_addr[:pos]
master_fe_port = int(master_fe_query_addr[pos + 1:])
chose_fe_ip = ""
chose_fe_port = -1
alive_fe = None
cluster = CLUSTER.Cluster.load(cluster_name)
containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
for container in containers:
if utils.is_container_running(container):
_, node_type, id = utils.parse_service_name(container.name)
if node_type == CLUSTER.Node.TYPE_FE:
node = cluster.get_node(node_type, id)
if not alive_fe:
alive_fe = node
if node.get_ip() == master_fe_ip and node.meta["ports"][
"query_port"] == master_fe_port:
alive_fe = node
break
if alive_fe:
chose_fe_ip = alive_fe.get_ip()
chose_fe_port = alive_fe.meta["ports"]["query_port"]
elif utils.is_socket_avail(master_fe_ip, master_fe_port):
chose_fe_ip = master_fe_ip
chose_fe_port = master_fe_port
else:
LOG.debug("no available alive fe")
return db_mgr
db_mgr.fe_ip = chose_fe_ip
db_mgr.fe_port = chose_fe_port
try:
db_mgr.load_states()
except Exception as e:
if required_load_succ:
raise e
#LOG.exception(e)
return db_mgr