blob: 23196965b12cff4e6b6dad484974b778ef72005f [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_binlog_restart.py
# @date 2021/11/02 14:37:00
# @brief This file is a test file for doris binlog load.
#
#############################################################################
"""
binlog load be/fe 异常测试
MySQL需开启binlog功能
"""
import os
import sys
import time
import pymysql
sys.path.append("../")
from lib import node_op
from lib import palo_config
from lib import palo_client
from lib import palo_job
from lib import util
from data import binlog as DATA
client = None
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
canal_ip = config.canal_ip
WAIT_TIME = 20
def setup_module():
"""
setUp
"""
global node_operator
node_operator = node_op.Node()
global client
master = node_operator.get_master()
client = palo_client.get_client(master, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
global connect
connect = pymysql.connect(host=config.mysql_host, port=config.mysql_port, user=config.canal_user, \
passwd=config.canal_password)
# client.set_frontend_config('enable_create_sync_job', 'true')
global destination
destination = 'example_' + str(config.fe_query_port)
def check_fe_be():
"""检查fe和be,是否有false,如果有则拉起来"""
ret = client.get_backend_list()
be_list = util.get_attr_condition_list(ret, palo_job.BackendProcInfo.Alive,
'false', palo_job.BackendProcInfo.Host)
if be_list is not None:
for be_host in be_list:
node_operator.start_be(be_host)
assert node_operator.is_be_alive(be_host)
ret = client.get_fe_list()
fe_list = util.get_attr_condition_list(ret, palo_job.FrontendInfo.Alive,
'false', palo_job.FrontendInfo.Host)
if fe_list is not None:
for fe_host in fe_list:
node_operator.start_fe(fe_host)
assert node_operator.is_fe_alive(fe_host)
def mysql_execute(sql):
"""
连接mysql执行语句
"""
cursor = connect.cursor()
try:
LOG.info(L('mysql check sql', sql=sql))
cursor.execute(sql)
cursor.close()
connect.commit()
except Exception as error:
assert False, "execute error. %s" % str(error)
LOG.error(L("mysql execute error", error=str(error)))
time.sleep(1)
return cursor.fetchall()
def create_mysql_table(mysql_table_name, mysql_database_name, columns, key='PRIMARY KEY (k1)', new_database=True):
"""
创建MySQL数据库和表
"""
if new_database:
mysql_clean(mysql_database_name)
sql = "CREATE DATABASE %s" % mysql_database_name
try:
mysql_execute(sql)
except Exception as error:
LOG.error(L("CREATE database error", host=config.canal_host, database_name=mysql_database_name, \
error=error))
connect.select_db(mysql_database_name)
sql = "DROP TABLE IF EXISTS %s" % mysql_table_name
mysql_execute(sql)
sql = ''
for column in columns:
column_sql = '%s %s' % (column[0], column[1])
if len(column) > 2:
if column[2]:
column_sql = '%s %s' % (column_sql, column[2])
if len(column) > 3:
column_sql = '%s DEFAULT "%s"' % (column_sql, column[3])
sql = '%s %s,' % (sql, column_sql)
sql = "CREATE table %s (%s %s)" % (mysql_table_name, sql, key)
LOG.info(L('mysql check sql', sql=sql))
try:
mysql_execute(sql)
except Exception as error:
LOG.error(L("CREATE TABLE error", host=config.canal_host, table_name=mysql_table_name, error=error))
return False
return True
def mysql_clean(mysql_database_name):
"""
mysql drop database
"""
sql = "DROP DATABASE IF EXISTS %s" % mysql_database_name
mysql_execute(sql)
def test_restart_be():
"""
{
"title": "test_sys_binlog_restart:test_restart_be",
"describe": "创建binlog load任务,be宕机,mysql执行导入语句,验证数据导入到doris",
"tag": "p0,system,fuzz"
}
"""
check_fe_be()
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + '_' + str(config.fe_query_port)
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name,
job_name, canal_ip, destination=destination)
assert ret, 'create sync job failed'
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
be_id = client.get_backend_id_list()[0]
be_host = client.get_be_hostname_by_id(be_id)
ret = node_operator.stop_be(be_host)
assert ret, 'stop be failed'
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
assert client.verify(DATA.expected_file_3, table_name)
ret = node_operator.start_be(be_host)
assert ret, 'start be failed'
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
check_fe_be()
def test_restart_fe():
"""
{
"title": "test_sys_binlog_restart:test_restart_fe",
"describe": "创建binlog load任务,fe master宕机,mysql执行导入语句,验证数据导入到doris",
"tag": "p0,system,fuzz"
}
"""
check_fe_be()
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + '_' + str(config.fe_query_port)
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name,
job_name, canal_ip, destination=destination)
assert ret, 'create sync job failed'
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
master_port = client.get_master()
master = master_port.split(':')[0]
ret = node_operator.stop_fe(master)
assert ret, 'stop fe failed'
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = node_operator.start_fe(master)
assert ret, 'start fe failed'
time.sleep(WAIT_TIME)
client.connect()
client.use(database_name)
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
assert client.verify(DATA.expected_file_3, table_name)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
check_fe_be()