| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| This module start Palo. |
| Date: 2015/10/07 17:23:06 |
| """ |
| import os |
| import time |
| import threading |
| import socket |
| |
| import env_config |
| import execute |
| import load_cluster |
| |
| |
| def start_one_fe(host_name): |
| """start one fe |
| """ |
| cmd = 'export JAVA_HOME=%s;cd %s/fe;sh bin/start_fe.sh --daemon' % \ |
| (env_config.JAVA_HOME, env_config.fe_path) |
| status, output = execute.exe_cmd(cmd, host_name) |
| time.sleep(10) |
| |
| |
| def start_one_fe_with_helper(host_name, master_host_port=None): |
| """start one fe with helper |
| """ |
| edit_log_port = env_config.fe_query_port - 20 |
| if master_host_port is None: |
| master_host_port = '%s:%d' % (env_config.master, edit_log_port) |
| |
| cmd = 'export JAVA_HOME=%s;cd %s/fe;sh bin/start_fe.sh --helper %s --daemon' % \ |
| (env_config.JAVA_HOME, env_config.fe_path, master_host_port) |
| status, output = execute.exe_cmd(cmd, host_name) |
| time.sleep(10) |
| |
| |
| def start_one_be(host_name): |
| """start one be |
| """ |
| cmd = 'export PATH=$PATH:/sbin; export JAVA_HOME=%s; cd %s/be; ' \ |
| 'sh bin/start_be.sh --daemon' % (env_config.JAVA_HOME, env_config.be_path) |
| status, output = execute.exe_cmd(cmd, host_name) |
| time.sleep(10) |
| |
| |
| def start_master(): |
| """start master |
| """ |
| start_one_fe(env_config.master) |
| |
| |
| def start_other_fe(): |
| """start fe |
| """ |
| start_fe_threads = [] |
| for host_name in env_config.follower_list + \ |
| env_config.observer_list + env_config.dynamic_add_fe_list: |
| t = threading.Thread(target=start_one_fe_with_helper, args=(host_name,)) |
| t.start() |
| start_fe_threads.append(t) |
| |
| for t in start_fe_threads: |
| t.join() |
| |
| |
| def start_be(): |
| """start be |
| """ |
| start_be_threads = [] |
| for host_name in env_config.be_list + env_config.dynamic_add_be_list: |
| t = threading.Thread(target=start_one_be, args=(host_name,)) |
| t.start() |
| start_be_threads.append(t) |
| |
| for t in start_be_threads: |
| t.join() |
| |
| |
| def add_be(): |
| """add be |
| """ |
| for host in env_config.be_list: |
| sql = 'ALTER SYSTEM ADD BACKEND "%s:%d"' % (host, env_config.heartbeat_service_port) |
| cmd = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql) |
| os.system(cmd) |
| |
| |
| def add_follower(): |
| """add follower |
| """ |
| for follower in env_config.follower_list: |
| sql = 'ALTER SYSTEM ADD FOLLOWER "%s:%d"' % (socket.gethostbyname(follower), env_config.edit_log_port) |
| cmd = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql) |
| os.system(cmd) |
| |
| |
| def add_observer(): |
| """add observer |
| """ |
| edit_log_port = env_config.fe_query_port - 20 |
| for observer in env_config.observer_list: |
| sql = 'ALTER SYSTEM ADD OBSERVER "%s:%d"' % (socket.gethostbyname(observer), env_config.edit_log_port) |
| cmd = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql) |
| os.system(cmd) |
| |
| |
| def add_load_cluster(): |
| """add load cluster |
| """ |
| sql_1, sql_2 = load_cluster.gen_add_load_cluster_sql('TestUser') |
| cmd_1 = 'mysql -h %s -P%s -u TestUser@test_cluster -e "%s"' % (env_config.master, env_config.fe_query_port, sql_1) |
| os.system(cmd_1) |
| cmd_2 = 'mysql -h %s -P%s -u TestUser@test_cluster -e "%s"' % (env_config.master, env_config.fe_query_port, sql_2) |
| os.system(cmd_2) |
| |
| |
| def create_test_cluster(): |
| """create test cluster |
| """ |
| sql_1 = 'CREATE CLUSTER test_cluster PROPERTIES("instance_num"="4") IDENTIFIED BY ""' |
| sql_2 = 'enter test_cluster;CREATE USER "TestUser" SUPERUSER' |
| cmd_1 = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql_1) |
| os.system(cmd_1) |
| cmd_2 = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql_2) |
| os.system(cmd_2) |
| |
| |
| def add_default_load_cluster(): |
| """default load cluster for user root""" |
| sql_1, sql_2 = load_cluster.gen_add_load_cluster_sql('root') |
| cmd_1 = 'mysql -h %s -P%s -u root@default_cluster -p%s -e "%s"' % (env_config.master, |
| env_config.fe_query_port, |
| env_config.fe_password, sql_1) |
| os.system(cmd_1) |
| cmd_2 = 'mysql -h %s -P%s -u root@default_cluster -p%s -e "%s"' % (env_config.master, |
| env_config.fe_query_port, |
| env_config.fe_password, sql_2) |
| os.system(cmd_2) |
| |
| |
| def add_password(): |
| """add root user password""" |
| sql = "set password for 'root'@'%%' = PASSWORD('%s')" % env_config.fe_password |
| cmd = 'mysql -h %s -P%s -uroot -e "%s"' % (env_config.master, env_config.fe_query_port, sql) |
| os.system(cmd) |
| |
| |
| def add_brokers(): |
| """add broker""" |
| for broker_name, broker_node in env_config.broker_list.items(): |
| sql = "ALTER SYSTEM ADD BROKER %s '%s'" % (broker_name, broker_node) |
| cmd = 'mysql -h %s -P%s -uroot -e "%s"' % (env_config.master, env_config.fe_query_port, sql) |
| os.system(cmd) |
| |
| |
| def add_auditload_plugin(): |
| """add audit load plugin""" |
| sql = """ |
| create table doris_audit_db__.doris_audit_tbl__ \ |
| ( \ |
| query_id varchar(48) comment 'Unique query id', \ |
| \`time\` datetime not null comment 'Query start time', \ |
| client_ip varchar(32) comment 'Client IP', \ |
| user varchar(64) comment 'User name', \ |
| db varchar(96) comment 'Database of this query', \ |
| state varchar(8) comment 'Query result state. EOF, ERR, OK', \ |
| query_time bigint comment 'Query execution time in millisecond', \ |
| scan_bytes bigint comment 'Total scan bytes of this query', \ |
| scan_rows bigint comment 'Total scan rows of this query', \ |
| return_rows bigint comment 'Returned rows of this query', \ |
| stmt_id int comment 'An incremental id of statement', \ |
| is_query tinyint comment 'Is this statemt a query. 1 or 0', \ |
| frontend_ip varchar(32) comment 'Frontend ip of executing this statement', \ |
| cpu_time_ms bigint comment 'Total scan cpu time in millisecond of this query', \ |
| sql_hash varchar(48) comment 'Hash value for this query', \ |
| sql_digest varchar(48) comment 'Sql digest of this query, will be empty if not a slow query', \ |
| peak_memory_bytes bigint comment 'Peak memory bytes used on all backends of this query', \ |
| stmt string comment 'The original statement, trimed if longer than 2G ' \ |
| ) engine=OLAP \ |
| duplicate key(query_id, \`time\`, client_ip) \ |
| partition by range(\`time\`) () \ |
| distributed by hash(query_id) buckets 1 \ |
| properties( \ |
| 'dynamic_partition.time_unit' = 'HOUR', \ |
| 'dynamic_partition.start' = '-48', \ |
| 'dynamic_partition.end' = '3', \ |
| 'dynamic_partition.prefix' = 'p', \ |
| 'dynamic_partition.buckets' = '1', \ |
| 'dynamic_partition.enable' = 'true', \ |
| 'replication_num' = '3' \ |
| ); |
| """ |
| cmd = 'mysql -h %s -P%s -uroot -p%s -e "%s"' % (env_config.master, env_config.fe_query_port, |
| env_config.fe_password, "create database doris_audit_db__") |
| os.system(cmd) |
| cmd = 'mysql -h %s -P%s -uroot -p%s -e "%s"' % (env_config.master, env_config.fe_query_port, |
| env_config.fe_password, sql) |
| os.system(cmd) |
| sql = "INSTALL PLUGIN FROM '%s/fe/plugin_auditloader'" % env_config.fe_path |
| cmd = 'mysql -h %s -P%s -uroot -p%s -e "%s"' % (env_config.master, env_config.fe_query_port, |
| env_config.fe_password, sql) |
| os.system(cmd) |
| |
| |
| def start_palo(init_state=False, deploy_audit=False): |
| """start palo |
| """ |
| start_master() |
| time.sleep(30) |
| if init_state: |
| add_follower() |
| add_observer() |
| add_be() |
| add_brokers() |
| add_password() |
| start_other_fe() |
| start_be() |
| time.sleep(5) |
| if deploy_audit: |
| add_auditload_plugin() |
| |
| |
| if __name__ == '__main__': |
| start_palo() |