blob: a0207f9707bd5b9fca812b37e66f175921c85288 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_binlog.py
# @date 2021/11/02 14:37:00
# @brief This file is a test file for doris binlog load.
#
#############################################################################
"""
测试binlog load
MySQL需开启binlog功能
"""
import os
import sys
import time
import pymysql
sys.path.append("../")
from lib import palo_config
from lib import palo_client
from lib import palo_job
from lib import util
from data import binlog as DATA
client = None
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
port = str(config.fe_query_port)
canal_ip = config.canal_ip
WAIT_TIME = 15
def setup_module():
"""
setUp
"""
global client
global connect
client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
connect = pymysql.connect(host=config.mysql_host, port=config.mysql_port, user=config.canal_user, \
passwd=config.canal_password)
# client.set_frontend_config('enable_create_sync_job', 'true')
global destination
destination = 'example_' + str(config.fe_query_port)
def mysql_execute(sql):
"""
连接mysql执行导入语句
"""
cursor = connect.cursor()
try:
LOG.info(L('mysql check sql', sql=sql))
cursor.execute(sql)
cursor.close()
connect.commit()
except Exception as error:
assert False, "execute error. %s" % str(error)
LOG.error(L("mysql execute error", error=str(error)))
time.sleep(1)
return cursor.fetchall()
def create_mysql_table(mysql_table_name, mysql_database_name, columns, key='PRIMARY KEY (k1)', new_database=True):
"""
创建MySQL数据库和表
"""
cursor = connect.cursor()
if new_database:
mysql_clean(mysql_database_name)
sql = "CREATE DATABASE %s" % mysql_database_name
try:
cursor.execute(sql)
LOG.info(L('CREATE MYSQL db succ', database=mysql_database_name))
except Exception as error:
LOG.error(L("CREATE MYSQL db error", host=config.mysql_host, database_name=mysql_database_name, \
error=error))
connect.select_db(mysql_database_name)
sql = "DROP TABLE IF EXISTS %s" % mysql_table_name
cursor.execute(sql)
sql = ''
for column in columns:
column_sql = '%s %s' % (column[0], column[1])
if len(column) > 2:
if column[2]:
column_sql = '%s %s' % (column_sql, column[2])
if len(column) > 3:
column_sql = '%s DEFAULT "%s"' % (column_sql, column[3])
sql = '%s %s,' % (sql, column_sql)
sql = "CREATE table %s (%s %s)" % (mysql_table_name, sql, key)
LOG.info(L('mysql table sql', sql=sql))
try:
cursor.execute(sql)
except Exception as error:
LOG.error(L("CREATE TABLE error", host=config.mysql_host, table_name=mysql_table_name, error=error))
return False
return True
def mysql_clean(mysql_database_name):
"""
mysql drop database
"""
sql = "DROP DATABASE IF EXISTS %s" % mysql_database_name
cursor = connect.cursor()
cursor.execute(sql)
LOG.info(L("DROP MYSQL db", db=mysql_database_name))
def check(table_name, mysql_table_name):
"""验证doris与mysql数据同步"""
sql_1 = "select * from %s order by k1" % table_name
sql_2 = "select * from %s order by k1" % mysql_table_name
cursor = connect.cursor()
cursor.execute(sql_2)
ret_2 = cursor.fetchall()
time_limit = 0
while time_limit < 60:
ret_1 = client.execute(sql_1)
if ret_1 == ret_2:
return True
break
else:
time_limit += 1
time.sleep(2)
return ret_1 == ret_2
def assertStop(ret, client, stop_job=None, info=''):
"""assert, if not stop binlog load"""
if not ret and stop_job is not None:
try:
show_ret = client.show_sync_job()
LOG.info(L('binlog load job info', ret=show_ret))
client.stop_sync_job(stop_job)
except Exception as e:
print(str(e))
assert ret, info
def check_data(data, table_name, job_name, fail_info, client=client):
"""验证binlog导入数据"""
time_limit = 0
try:
while time_limit < 90:
ret = client.verify(data, table_name)
if ret:
break
else:
time.sleep(2)
time_limit += 1
except Exception as e:
LOG.info(L('get an error when check verify', msg=str(e)))
client.stop_sync_job(job_name)
assertStop(ret, client, job_name, fail_info)
def test_binlog():
"""
{
"title": "test_sys_binlog:test_binlog",
"describe": "创建binlog load任务,mysql执行insert, delete, update操作,验证doris中导入数据成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
#insert
sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_1, table_name, job_name, 'insert data failed', client)
#delete
sql = open(DATA.binlog_sql_2, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_2, table_name, job_name, 'delete data failed', client)
#update
sql = open(DATA.binlog_sql_3, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'update data failed', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_binlog_function():
"""
{
"title": "test_sys_binlog:test_binlog_function",
"describe": "创建binlog load任务,mysql执行导入语句中包含函数,验证doris中导入数据成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_5, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_binlog_select():
"""
{
"title": "test_sys_binlog:test_binlog_select",
"describe": "binlog load 过程中执行查询操作,验证查询结果正确",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = "select * from %s order by k1" % table_name
ret = (client.execute(sql) == ())
assertStop(ret, client, job_name, 'query data error')
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
ret = check(table_name, mysql_table_name)
assertStop(ret, client, job_name, 'select data error')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_binlog_special():
"""
{
"title": "test_sys_binlog:test_binlog_special",
"describe": "创建binlog load任务,mysql导入特殊值:NULL,随机数(支持decimal精度),时间,验证doris导入数据成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_2, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_2)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = 'insert into %s values (NULL, NULL, NULL, NULL, NULL)' % mysql_table_name
util.assert_return(False, "Column 'k1' cannot be null", mysql_execute, sql)
sql = 'insert into %s values (1, NULL, NULL, NULL, NULL)' % mysql_table_name
mysql_execute(sql)
ret = check(table_name, mysql_table_name)
assertStop(ret, client, job_name, 'data load fail')
sql = 'insert into %s values (2, FLOOR(RAND() * 10), "2021-08-18", "2021-08-18 00:05:10", 3.22), \
(3, FLOOR(RAND() * 100), "2021-08-18", "2021-08-18 00:05:10", 3.22), \
(4, FLOOR(RAND() * 1000), "2021-08-18", "2021-08-18 00:05:10", 3.22)' % mysql_table_name
mysql_execute(sql)
ret = check(table_name, mysql_table_name)
assertStop(ret, client, job_name, 'data load fail')
sql = 'insert into %s values (5, 20, "2021-08-18", "2021-08-18 00:05:10", RAND()), \
(6, 20, "2021-08-18", "2021-08-18 00:05:10", RAND() * 10), \
(7, 20, "2021-08-18", "2021-08-18 00:05:10", RAND() * 100)' % mysql_table_name
mysql_execute(sql)
ret = check(table_name, mysql_table_name)
assertStop(ret, client, job_name, 'data load fail')
sql = 'insert into %s values (8, 20, current_date, "2021-08-18 00:05:10", 3.22)' % mysql_table_name
mysql_execute(sql)
ret = check(table_name, mysql_table_name)
assertStop(ret, client, job_name, 'data load fail')
sql = 'insert into %s values (9, 20, "2021-08-18", now(), 3.22)' % mysql_table_name
mysql_execute(sql)
ret = check(table_name, mysql_table_name)
assertStop(ret, client, job_name, 'data load fail')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_partition():
"""
{
"title": "test_sys_binlog:test_partition",
"describe": "开启binlog load,doris增加、删除分区,mysql执行导入语句,验证数据可导入到已有分区",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['20', '40', '60', '80']
partition_info = palo_client.PartitionInfo('k1', partition_name_list, partition_value_list)
client.create_table(table_name, DATA.column_1, partition_info, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
#导入数据在doris中无分区,无法导入
sql = open(DATA.binlog_sql_12, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
#add partition
ret = client.add_partition(table_name, 'partition_e', '100')
assertStop(ret, client, job_name, 'add partition failed')
ret = client.verify(DATA.expected_file_3, table_name)
assertStop(ret, client, job_name, 'data load fail')
#drop partition without data
ret = client.drop_partition(table_name, 'partition_d')
assertStop(ret, client, job_name, 'Drop partition failed')
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
#drop partition with data
ret = client.drop_partition(table_name, 'partition_a')
assertStop(ret, client, job_name, 'drop partition failed')
ret = not client.verify(DATA.expected_file_3, table_name)
assertStop(ret, client, job_name, 'data load fail')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_mysql_transaction():
"""
{
"title": "test_sys_binlog:test_mysql_transaction",
"describe": "开启binlog load,mysql提交事务,验证doris中数据导入成功",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql' + table_name
transaction_table = mysql_table_name + '_t'
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
create_mysql_table(transaction_table, mysql_database_name, DATA.column_1, new_database=False)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql_1 = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name)
sql_2 = open(DATA.binlog_sql_1, 'r').read().format(transaction_table)
sql_3 = open(DATA.binlog_sql_2, 'r').read().format(mysql_table_name)
sql_4 = open(DATA.binlog_sql_3, 'r').read().format(mysql_table_name)
sql_5 = open(DATA.binlog_sql_2, 'r').read().format(transaction_table)
sql = "begin; %s; %s; %s; %s; %s; commit;" % (sql_1, sql_2, sql_3, sql_4, sql_5)
try:
mysql_execute(sql)
ret = True
except:
ret = False
assertStop(ret, client, job_name, 'mysql transaction failed')
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_diff_schema_1():
"""
{
"title": "test_sys_binlog:test_diff_schema_1",
"describe": "mysql与doris表结构不同,doris表有mysql表不存在的列,任务进行,但数据无法导入到doris",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_3, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
sql = 'select * from %s' % table_name
ret = (client.execute(sql) == ())
assertStop(ret, client, job_name, 'query data error')
ret = client.get_sync_job_state(job_name) == 'RUNNING'
assertStop(ret, client, job_name, 'sync job is not running')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_diff_schema_2():
"""
{
"title": "test_sys_binlog:test_diff_schema_2",
"describe": "mysql与doris表结构不同,mysql表有doris表不存在的列,任务进行,但数据无法导入到doris",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_4, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
sql = 'select * from %s' % table_name
ret = (client.execute(sql) == ())
assertStop(ret, client, job_name, 'query data error')
ret = client.get_sync_job_state(job_name) == 'RUNNING'
assertStop(ret, client, job_name, 'sync job is not running')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_alter_mysql_schema():
"""
{
"title": "test_sys_binlog:test_alter_mysql_schema",
"describe": "创建binlog load任务,修改mysql表结构,mysql执行导入语句,验证数据按doris列导入",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name + str(int(time.time()))
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#add column
sql = "alter table %s add column k8 int(10)" % mysql_table_name
mysql_execute(sql)
sql = open(DATA.binlog_sql_9, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load fail')
#drop new column
sql = "alter table %s drop column k8" % mysql_table_name
mysql_execute(sql)
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data([DATA.expected_file_5, DATA.expected_file_7], table_name, job_name, 'data load fail', client)
#drop column
sql = "alter table %s drop column k7" % mysql_table_name
mysql_execute(sql)
sql = open(DATA.binlog_sql_10, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify([DATA.expected_file_5, DATA.expected_file_7], table_name)
assertStop(ret, client, job_name, 'data load fail')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_alter_doris_schema():
"""
{
"title": "test_sys_binlog:test_alter_doris_schema",
"describe": "创建binlog load任务,修改doris表结构,mysql执行导入语句,验证数据按doris列导入",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#add column
client.pause_sync_job(job_name)
client.schema_change(table_name, add_column_list=["k8 int"], is_wait=True)
client.resume_sync_job(job_name)
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_8, table_name, job_name, 'data load fail', client)
#drop new column
client.schema_change(table_name, drop_column_list=["k8"], is_wait=True)
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data([DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7], table_name, job_name, \
'data load fail', client)
#drop column
client.schema_change(table_name, drop_column_list=["k7"], is_wait=True)
sql = open(DATA.binlog_sql_11, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_9, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_mysql_primary_key():
"""
{
"title": "test_sys_binlog:test_mysql_promary_key",
"describe": "创建binlog load任务,mysql操作primary索引表导入数据,验证支持binlog导入数据到doris",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_mysql_unique_key():
"""
{
"title": "test_sys_binlog:test_mysql_unique_key",
"describe": "创建binlog load任务,mysql操作unique索引表导入数据,验证支持binlog导入数据到doris",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, 'UNIQUE (k1)')
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_mysql_index_key():
"""
{
"title": "test_sys_binlog:test_mysql_index_key",
"describe": "创建binlog load任务,mysql操作index索引表导入数据,验证支持binlog导入数据到doris",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, 'INDEX (k1)')
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_pause_resume():
"""
{
"title": "test_sys_binlog:test_pause_resume",
"describe": "创建binlog load任务,验证暂停任务后重启任务成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#pause sync job
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#resume sync job
ret = client.resume_sync_job(job_name)
assertStop(ret, client, job_name, 'resume sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7]
check_data(expected_file_list, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_create_same_job():
"""
{
"title": "test_sys_binlog:test_create_same_job",
"describe": "验证创建相同的binlog load任务失败",
"tag": "p0,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
util.assert_return(False, 'already exists', client.create_sync_job, table_name, database_name, mysql_table_name, \
mysql_database_name, job_name, canal_ip, destination=destination)
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_resume_after_create():
"""
{
"title": "test_sys_binlog:test_resume_after_create",
"describe": "创建binlog load任务后,重启任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
util.assert_return(False, 'There is no paused job', client.resume_sync_job, job_name)
ret = client.get_sync_job_state(job_name) == 'RUNNING'
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_pause_same_job():
"""
{
"title": "test_sys_binlog:test_pause_same_job",
"describe": "创建binlog load任务,验证暂停同一个任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
util.assert_return(False, "There is no running job", client.pause_sync_job, job_name)
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
sql = "select * from %s" % table_name
ret = (client.execute(sql) == ())
assertStop(ret, client, job_name, 'query data error')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_create_after_pause():
"""
{
"title": "test_sys_binlog:test_create_after_pause",
"describe": "创建binlog load任务,暂停任务后再次创建该任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
util.assert_return(False, "already exists", client.create_sync_job, table_name, database_name, mysql_table_name, \
mysql_database_name, job_name, canal_ip)
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
sql = "select * from %s" % table_name
ret = (client.execute(sql) == ())
assertStop(ret, client, job_name, 'query data error')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_resume_same_job():
"""
{
"title": "test_sys_binlog:test_resume_same_job",
"describe": "创建binlog load任务,暂停任务,重启任务,验证再次重启失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.resume_sync_job(job_name)
assertStop(ret, client, job_name, 'resume sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
util.assert_return(False, "There is no paused job", client.resume_sync_job, job_name)
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_create_after_resume():
"""
{
"title": "test_sys_binlog:test_create_after_resume",
"describe": "创建binlog load任务,暂停任务,重启任务,验证再次创建该任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.resume_sync_job(job_name)
assertStop(ret, client, job_name, 'resume sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
util.assert_return(False, "already exists", client.create_sync_job, table_name, database_name, mysql_table_name, \
mysql_database_name, job_name, canal_ip, destination=destination)
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_pause_and_resume():
"""
{
"title": "test_sys_binlog:test_pause_and_resume",
"describe": "创建binlog load任务,验证多次暂停、重启任务成功",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
count = 10
while count > 0:
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
ret = client.resume_sync_job(job_name)
assertStop(ret, client, job_name, 'resume sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(1)
count -= 1
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_sync_job_info():
"""
{
"title": "test_sys_binlog:test_sync_job_info",
"describe": "创建binlog load任务,验证show sync job info",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
canal_port = '11111'
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
sync_job_list = client.show_sync_job()
sync_channel = '%s.%s->%s' % (mysql_database_name, mysql_table_name, table_name)
sync_job_config = 'address:%s:%s,destination:%s,batchSize:8192' % (canal_ip, canal_port, destination)
for sync_job in sync_job_list:
sync_job_info = palo_job.SyncJobInfo(sync_job)
if sync_job_info.get_job_name() == job_name:
ret = (sync_job_info.get_state() == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
ret = (sync_job_info.get_channel() == sync_channel)
assertStop(ret, client, job_name, 'sync channel error')
ret = (sync_job_info.get_job_config() == sync_job_config)
assertStop(ret, client, job_name, 'sync job config error')
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_tables_in_one_job():
"""
{
"title": "test_sys_binlog:test_tables_in_one_job",
"describe": "创建单个binlog load任务,任务中包括多对表的对应关系,mysql执行导入语句,验证数据导入doris成功",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
mysql_database_name = 'm_' + database_name + port
table_names = []
mysql_table_names = []
mysql_database_names = [mysql_database_name for i in range(3)]
for i in ['_1', '_2', '_3']:
table_name_s = table_name + i
client.create_table(table_name_s, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
table_names.append(table_name_s)
mysql_table_name = 'mysql_' + table_name_s
if i == '_1':
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
else:
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, new_database=False)
mysql_table_names.append(mysql_table_name)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_names, database_name, mysql_table_names, mysql_database_names, job_name, \
canal_ip, destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_names[0])
mysql_execute(sql)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_names[1])
mysql_execute(sql)
sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_names[2])
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
check_data(DATA.expected_file_3, table_names[0], job_name, 'data load fail', client)
check_data(DATA.expected_file_3, table_names[1], job_name, 'data load fail', client)
check_data(DATA.expected_file_1, table_names[2], job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_tables_in_jobs():
"""
{
"title": "test_sys_binlog:test_tables_in_jobs",
"describe": "创建多个binlog load任务,任务中包含多对表的对应关系,mysql执行导入语句,验证数据导入doris成功",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
mysql_database_name = 'm_' + database_name + port
destination_list = [destination + '_1', destination + '_2', destination + '_3']
count = 0
job_name_list = []
mysql_table_name_list = []
table_name_list = []
for i in ['_1', '_2', '_3']:
table_name_s = table_name + i
table_name_list.append(table_name_s)
client.create_table(table_name_s, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_table_name = 'mysql_' + table_name_s
if i == '_1':
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
else:
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, new_database=False)
mysql_table_name_list.append(mysql_table_name)
job_name = 'job_' + table_name_s
job_name_list.append(job_name)
ret = client.create_sync_job(table_name_s, database_name, mysql_table_name, mysql_database_name, job_name, \
canal_ip, destination=destination_list[count])
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
count += 1
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name_list[0])
mysql_execute(sql)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name_list[1])
mysql_execute(sql)
sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name_list[2])
mysql_execute(sql)
for i in range(3):
ret = (client.get_sync_job_state(job_name_list[i]) == 'RUNNING')
assertStop(ret, client, job_name_list[i], 'sync job state error')
check_data(DATA.expected_file_3, table_name_list[0], job_name_list[0], 'data load fail', client)
check_data(DATA.expected_file_3, table_name_list[1], job_name_list[1], 'data load fail', client)
check_data(DATA.expected_file_1, table_name_list[2], job_name_list[2], 'data load fail', client)
for i in range(3):
client.stop_sync_job(job_name_list[i])
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_stop_after_create():
"""
{
"title": "test_sys_binlog:test_stop_after_create",
"describe": "创建binlog load任务,验证终止任务成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_stop_after_pause():
"""
{
"title": "test_sys_binlog:test_stop_after_pause",
"describe": "创建binlog load任务,验证暂停任务后终止任务成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#pause sync job
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_stop_after_resume():
"""
{
"title": "test_sys_binlog:test_stop_after_resume",
"describe": "创建binlog load任务,暂停任务,重启任务,验证终止任务成功",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#pause sync job
ret = client.pause_sync_job(job_name)
assertStop(ret, client, job_name, 'pause sync job failed')
ret = (client.get_sync_job_state(job_name) == 'PAUSED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#resume sync job
ret = client.resume_sync_job(job_name)
assertStop(ret, client, job_name, 'resume sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7]
check_data(expected_file_list, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_11, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(expected_file_list, table_name)
assertStop(ret, client, job_name, 'data load error')
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_stop_after_stop():
"""
{
"title": "test_sys_binlog:test_stop_after_stop",
"describe": "创建binlog load任务,终止任务,验证再次终止任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#stop sync job again
util.assert_return(False, 'There is no uncompleted job', client.stop_sync_job, job_name)
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_create_same_job_after_stop():
"""
{
"title": "test_sys_binlog:test_create_same_job_after_stop",
"describe": "创建binlog load任务,终止任务,验证创建不同名但表对应与之前相同的任务成功,数据能够继续导入",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#create same sync
job_name_2 = job_name + '_2'
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name_2, \
canal_ip, destination=destination)
assertStop(ret, client, job_name_2, 'create sync job failed')
ret = (client.get_sync_job_state(job_name_2) == 'RUNNING')
assertStop(ret, client, job_name_2, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7]
check_data(expected_file_list, table_name, job_name_2, 'data load fail', client)
client.stop_sync_job(job_name_2)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_create_same_name_job_after_stop():
"""
{
"title": "test_sys_binlog:test_create_same_name_job_after_stop",
"describe": "创建binlog load任务,终止任务,验证创建同名任务成功,执行mysql导入语句,验证doris数据导入正确",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#create same sync job
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, \
canal_ip, destination=destination, is_wait=False)
assertStop(ret, client, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7]
check_data(expected_file_list, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_pause_after_stop():
"""
{
"title": "test_sys_binlog:test_pause_after_stop",
"describe": "创建binlog load任务,终止任务,验证暂停任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#pause sync job
util.assert_return(False, 'There is no running job', client.pause_sync_job, job_name)
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_resume_after_stop():
"""
{
"title": "test_sys_binlog:test_resume_after_stop",
"describe": "创建binlog load任务,终止任务,验证重启任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client)
#stop sync job
ret = client.stop_sync_job(job_name)
assertStop(ret, client, job_name, 'stop sync job failed')
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
#resume sync job
util.assert_return(False, 'There is no paused job', client.resume_sync_job, job_name)
ret = (client.get_sync_job_state(job_name) == 'CANCELLED')
assertStop(ret, client, job_name, 'sync job state error')
sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = client.verify(DATA.expected_file_5, table_name)
assertStop(ret, client, job_name, 'data load error')
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_column():
"""
{
"title": "test_sys_binlog:test_column",
"describe": "创建binlog load任务,指定列映射,mysql执行导入命令,验证doris数据导入",
"tag": "p0,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_5, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
columns = ['k1', 'k6', 'k3', 'k4', 'k2', 'k7', 'k5']
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
columns=columns, destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_10, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_add_nullable_column():
"""
{
"title": "test_sys_binlog:test_add_nullable_column",
"describe": "创建binlog load任务,指定列映射,doris增加nullable列,mysql执行导入命令,验证数据导入到doris",
"tag": "p1,system"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_5, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
columns = ['k1', 'k6', 'k3', 'k4', 'k2', 'k7', 'k5']
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
columns=columns, destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
time.sleep(WAIT_TIME)
client.schema_change(table_name, add_column_list=["k8 int after k1"], is_wait=True)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_11, table_name, job_name, 'data load fail', client)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_same_destination():
"""
{
"title": "test_sys_binlog:test_same_destination",
"describe": "canal中每一个的destination仅对应一个binlog load任务,重复使用destination,验证创建binlog load任务失败",
"tag": "p1,system,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \
destination=destination)
assertStop(ret, client, job_name, 'create sync job failed')
ret = (client.get_sync_job_state(job_name) == 'RUNNING')
assertStop(ret, client, job_name, 'sync job state error')
#使用重复的destination创建binlog load任务失败
table_name_s = table_name + '_s'
mysql_table_name_s = mysql_table_name + '_s'
client.create_table(table_name_s, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
create_mysql_table(mysql_table_name_s, mysql_database_name, DATA.column_1, new_database=False)
job_name_s = job_name + '_s'
util.assert_return(False, 'conflict destination', client.create_sync_job, table_name_s, database_name, \
mysql_table_name_s, mysql_database_name, job_name_s, canal_ip, destination=destination)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
def test_observer_fe():
"""
{
"title": "test_sys_binlog:test_observer_fe",
"describe": "连接observer fe,创建binlog load 任务,mysql执行导入语句,验证doris导入数据",
"tag": "p1,system"
}
"""
client_observer = palo_client.get_client(config.fe_observer_list[0], config.fe_query_port, user=config.fe_user, \
password=config.fe_password, http_port=config.fe_http_port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client_observer.clean(database_name)
client_observer.create_database(database_name)
client_observer.use(database_name)
client_observer.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client_observer.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, \
canal_ip, destination=destination)
assertStop(ret, client_observer, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client_observer)
client_observer.stop_sync_job(job_name)
client_observer.clean(database_name)
mysql_clean(mysql_database_name)
def test_follower_fe():
"""
{
"title": "test_sys_binlog:test_follower_fe",
"describe": "连接follower fe,创建binlog load 任务,mysql执行导入语句,验证doris导入数据",
"tag": "p1,system"
}
"""
client_follower = palo_client.get_client(config.fe_follower_list[0], config.fe_query_port, user=config.fe_user, \
password=config.fe_password, http_port=config.fe_http_port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client_follower.clean(database_name)
client_follower.create_database(database_name)
client_follower.use(database_name)
client_follower.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + port
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client_follower.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, \
canal_ip, destination=destination)
assertStop(ret, client_follower, job_name, 'create sync job failed')
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client_follower)
client_follower.stop_sync_job(job_name)
client_follower.clean(database_name)
mysql_clean(mysql_database_name)
def teardown_module():
"""
tearDown
"""
connect.close()