| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| ############################################################################ |
| # |
| # @file test_sys_binlog.py |
| # @date 2021/11/02 14:37:00 |
| # @brief This file is a test file for doris binlog load. |
| # |
| ############################################################################# |
| |
| """ |
| 测试binlog load |
| MySQL需开启binlog功能 |
| """ |
| |
| import os |
| import sys |
| import time |
| import pymysql |
| sys.path.append("../") |
| from lib import palo_config |
| from lib import palo_client |
| from lib import palo_job |
| from lib import util |
| from data import binlog as DATA |
| client = None |
| config = palo_config.config |
| LOG = palo_client.LOG |
| L = palo_client.L |
| |
| port = str(config.fe_query_port) |
| canal_ip = config.canal_ip |
| WAIT_TIME = 15 |
| |
| def setup_module(): |
| """ |
| setUp |
| """ |
| global client |
| global connect |
| client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user, |
| password=config.fe_password, http_port=config.fe_http_port) |
| connect = pymysql.connect(host=config.mysql_host, port=config.mysql_port, user=config.canal_user, \ |
| passwd=config.canal_password) |
| # client.set_frontend_config('enable_create_sync_job', 'true') |
| global destination |
| destination = 'example_' + str(config.fe_query_port) |
| |
| |
| def mysql_execute(sql): |
| """ |
| 连接mysql执行导入语句 |
| """ |
| cursor = connect.cursor() |
| try: |
| LOG.info(L('mysql check sql', sql=sql)) |
| cursor.execute(sql) |
| cursor.close() |
| connect.commit() |
| except Exception as error: |
| assert False, "execute error. %s" % str(error) |
| LOG.error(L("mysql execute error", error=str(error))) |
| time.sleep(1) |
| return cursor.fetchall() |
| |
| |
| def create_mysql_table(mysql_table_name, mysql_database_name, columns, key='PRIMARY KEY (k1)', new_database=True): |
| """ |
| 创建MySQL数据库和表 |
| """ |
| cursor = connect.cursor() |
| if new_database: |
| mysql_clean(mysql_database_name) |
| sql = "CREATE DATABASE %s" % mysql_database_name |
| try: |
| cursor.execute(sql) |
| LOG.info(L('CREATE MYSQL db succ', database=mysql_database_name)) |
| except Exception as error: |
| LOG.error(L("CREATE MYSQL db error", host=config.mysql_host, database_name=mysql_database_name, \ |
| error=error)) |
| connect.select_db(mysql_database_name) |
| sql = "DROP TABLE IF EXISTS %s" % mysql_table_name |
| cursor.execute(sql) |
| sql = '' |
| for column in columns: |
| column_sql = '%s %s' % (column[0], column[1]) |
| if len(column) > 2: |
| if column[2]: |
| column_sql = '%s %s' % (column_sql, column[2]) |
| if len(column) > 3: |
| column_sql = '%s DEFAULT "%s"' % (column_sql, column[3]) |
| sql = '%s %s,' % (sql, column_sql) |
| sql = "CREATE table %s (%s %s)" % (mysql_table_name, sql, key) |
| LOG.info(L('mysql table sql', sql=sql)) |
| try: |
| cursor.execute(sql) |
| except Exception as error: |
| LOG.error(L("CREATE TABLE error", host=config.mysql_host, table_name=mysql_table_name, error=error)) |
| return False |
| return True |
| |
| |
| def mysql_clean(mysql_database_name): |
| """ |
| mysql drop database |
| """ |
| sql = "DROP DATABASE IF EXISTS %s" % mysql_database_name |
| cursor = connect.cursor() |
| cursor.execute(sql) |
| LOG.info(L("DROP MYSQL db", db=mysql_database_name)) |
| |
| |
| def check(table_name, mysql_table_name): |
| """验证doris与mysql数据同步""" |
| sql_1 = "select * from %s order by k1" % table_name |
| sql_2 = "select * from %s order by k1" % mysql_table_name |
| cursor = connect.cursor() |
| cursor.execute(sql_2) |
| ret_2 = cursor.fetchall() |
| time_limit = 0 |
| while time_limit < 60: |
| ret_1 = client.execute(sql_1) |
| if ret_1 == ret_2: |
| return True |
| break |
| else: |
| time_limit += 1 |
| time.sleep(2) |
| return ret_1 == ret_2 |
| |
| |
| def assertStop(ret, client, stop_job=None, info=''): |
| """assert, if not stop binlog load""" |
| if not ret and stop_job is not None: |
| try: |
| show_ret = client.show_sync_job() |
| LOG.info(L('binlog load job info', ret=show_ret)) |
| client.stop_sync_job(stop_job) |
| except Exception as e: |
| print(str(e)) |
| assert ret, info |
| |
| |
| def check_data(data, table_name, job_name, fail_info, client=client): |
| """验证binlog导入数据""" |
| time_limit = 0 |
| try: |
| while time_limit < 90: |
| ret = client.verify(data, table_name) |
| if ret: |
| break |
| else: |
| time.sleep(2) |
| time_limit += 1 |
| except Exception as e: |
| LOG.info(L('get an error when check verify', msg=str(e))) |
| client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, fail_info) |
| |
| |
| def test_binlog(): |
| """ |
| { |
| "title": "test_sys_binlog:test_binlog", |
| "describe": "创建binlog load任务,mysql执行insert, delete, update操作,验证doris中导入数据成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| #insert |
| sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_1, table_name, job_name, 'insert data failed', client) |
| #delete |
| sql = open(DATA.binlog_sql_2, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_2, table_name, job_name, 'delete data failed', client) |
| #update |
| sql = open(DATA.binlog_sql_3, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'update data failed', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_binlog_function(): |
| """ |
| { |
| "title": "test_sys_binlog:test_binlog_function", |
| "describe": "创建binlog load任务,mysql执行导入语句中包含函数,验证doris中导入数据成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_5, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_binlog_select(): |
| """ |
| { |
| "title": "test_sys_binlog:test_binlog_select", |
| "describe": "binlog load 过程中执行查询操作,验证查询结果正确", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = "select * from %s order by k1" % table_name |
| ret = (client.execute(sql) == ()) |
| assertStop(ret, client, job_name, 'query data error') |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| ret = check(table_name, mysql_table_name) |
| assertStop(ret, client, job_name, 'select data error') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_binlog_special(): |
| """ |
| { |
| "title": "test_sys_binlog:test_binlog_special", |
| "describe": "创建binlog load任务,mysql导入特殊值:NULL,随机数(支持decimal精度),时间,验证doris导入数据成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_2, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_2) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = 'insert into %s values (NULL, NULL, NULL, NULL, NULL)' % mysql_table_name |
| util.assert_return(False, "Column 'k1' cannot be null", mysql_execute, sql) |
| |
| sql = 'insert into %s values (1, NULL, NULL, NULL, NULL)' % mysql_table_name |
| mysql_execute(sql) |
| ret = check(table_name, mysql_table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| |
| sql = 'insert into %s values (2, FLOOR(RAND() * 10), "2021-08-18", "2021-08-18 00:05:10", 3.22), \ |
| (3, FLOOR(RAND() * 100), "2021-08-18", "2021-08-18 00:05:10", 3.22), \ |
| (4, FLOOR(RAND() * 1000), "2021-08-18", "2021-08-18 00:05:10", 3.22)' % mysql_table_name |
| mysql_execute(sql) |
| ret = check(table_name, mysql_table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| |
| sql = 'insert into %s values (5, 20, "2021-08-18", "2021-08-18 00:05:10", RAND()), \ |
| (6, 20, "2021-08-18", "2021-08-18 00:05:10", RAND() * 10), \ |
| (7, 20, "2021-08-18", "2021-08-18 00:05:10", RAND() * 100)' % mysql_table_name |
| mysql_execute(sql) |
| ret = check(table_name, mysql_table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| |
| sql = 'insert into %s values (8, 20, current_date, "2021-08-18 00:05:10", 3.22)' % mysql_table_name |
| mysql_execute(sql) |
| ret = check(table_name, mysql_table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| |
| sql = 'insert into %s values (9, 20, "2021-08-18", now(), 3.22)' % mysql_table_name |
| mysql_execute(sql) |
| ret = check(table_name, mysql_table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_partition(): |
| """ |
| { |
| "title": "test_sys_binlog:test_partition", |
| "describe": "开启binlog load,doris增加、删除分区,mysql执行导入语句,验证数据可导入到已有分区", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d'] |
| partition_value_list = ['20', '40', '60', '80'] |
| partition_info = palo_client.PartitionInfo('k1', partition_name_list, partition_value_list) |
| client.create_table(table_name, DATA.column_1, partition_info, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| #导入数据在doris中无分区,无法导入 |
| sql = open(DATA.binlog_sql_12, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| #add partition |
| ret = client.add_partition(table_name, 'partition_e', '100') |
| assertStop(ret, client, job_name, 'add partition failed') |
| ret = client.verify(DATA.expected_file_3, table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| #drop partition without data |
| ret = client.drop_partition(table_name, 'partition_d') |
| assertStop(ret, client, job_name, 'Drop partition failed') |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| #drop partition with data |
| ret = client.drop_partition(table_name, 'partition_a') |
| assertStop(ret, client, job_name, 'drop partition failed') |
| ret = not client.verify(DATA.expected_file_3, table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_mysql_transaction(): |
| """ |
| { |
| "title": "test_sys_binlog:test_mysql_transaction", |
| "describe": "开启binlog load,mysql提交事务,验证doris中数据导入成功", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql' + table_name |
| transaction_table = mysql_table_name + '_t' |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| create_mysql_table(transaction_table, mysql_database_name, DATA.column_1, new_database=False) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql_1 = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name) |
| sql_2 = open(DATA.binlog_sql_1, 'r').read().format(transaction_table) |
| sql_3 = open(DATA.binlog_sql_2, 'r').read().format(mysql_table_name) |
| sql_4 = open(DATA.binlog_sql_3, 'r').read().format(mysql_table_name) |
| sql_5 = open(DATA.binlog_sql_2, 'r').read().format(transaction_table) |
| sql = "begin; %s; %s; %s; %s; %s; commit;" % (sql_1, sql_2, sql_3, sql_4, sql_5) |
| try: |
| mysql_execute(sql) |
| ret = True |
| except: |
| ret = False |
| assertStop(ret, client, job_name, 'mysql transaction failed') |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_diff_schema_1(): |
| """ |
| { |
| "title": "test_sys_binlog:test_diff_schema_1", |
| "describe": "mysql与doris表结构不同,doris表有mysql表不存在的列,任务进行,但数据无法导入到doris", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_3, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| sql = 'select * from %s' % table_name |
| ret = (client.execute(sql) == ()) |
| assertStop(ret, client, job_name, 'query data error') |
| ret = client.get_sync_job_state(job_name) == 'RUNNING' |
| assertStop(ret, client, job_name, 'sync job is not running') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_diff_schema_2(): |
| """ |
| { |
| "title": "test_sys_binlog:test_diff_schema_2", |
| "describe": "mysql与doris表结构不同,mysql表有doris表不存在的列,任务进行,但数据无法导入到doris", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_4, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| sql = 'select * from %s' % table_name |
| ret = (client.execute(sql) == ()) |
| assertStop(ret, client, job_name, 'query data error') |
| ret = client.get_sync_job_state(job_name) == 'RUNNING' |
| assertStop(ret, client, job_name, 'sync job is not running') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_alter_mysql_schema(): |
| """ |
| { |
| "title": "test_sys_binlog:test_alter_mysql_schema", |
| "describe": "创建binlog load任务,修改mysql表结构,mysql执行导入语句,验证数据按doris列导入", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name + str(int(time.time())) |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #add column |
| sql = "alter table %s add column k8 int(10)" % mysql_table_name |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_9, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| #drop new column |
| sql = "alter table %s drop column k8" % mysql_table_name |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data([DATA.expected_file_5, DATA.expected_file_7], table_name, job_name, 'data load fail', client) |
| #drop column |
| sql = "alter table %s drop column k7" % mysql_table_name |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_10, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify([DATA.expected_file_5, DATA.expected_file_7], table_name) |
| assertStop(ret, client, job_name, 'data load fail') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_alter_doris_schema(): |
| """ |
| { |
| "title": "test_sys_binlog:test_alter_doris_schema", |
| "describe": "创建binlog load任务,修改doris表结构,mysql执行导入语句,验证数据按doris列导入", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #add column |
| client.pause_sync_job(job_name) |
| client.schema_change(table_name, add_column_list=["k8 int"], is_wait=True) |
| client.resume_sync_job(job_name) |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_8, table_name, job_name, 'data load fail', client) |
| #drop new column |
| client.schema_change(table_name, drop_column_list=["k8"], is_wait=True) |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data([DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7], table_name, job_name, \ |
| 'data load fail', client) |
| #drop column |
| client.schema_change(table_name, drop_column_list=["k7"], is_wait=True) |
| sql = open(DATA.binlog_sql_11, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_9, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_mysql_primary_key(): |
| """ |
| { |
| "title": "test_sys_binlog:test_mysql_promary_key", |
| "describe": "创建binlog load任务,mysql操作primary索引表导入数据,验证支持binlog导入数据到doris", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_mysql_unique_key(): |
| """ |
| { |
| "title": "test_sys_binlog:test_mysql_unique_key", |
| "describe": "创建binlog load任务,mysql操作unique索引表导入数据,验证支持binlog导入数据到doris", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, 'UNIQUE (k1)') |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_mysql_index_key(): |
| """ |
| { |
| "title": "test_sys_binlog:test_mysql_index_key", |
| "describe": "创建binlog load任务,mysql操作index索引表导入数据,验证支持binlog导入数据到doris", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, 'INDEX (k1)') |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_pause_resume(): |
| """ |
| { |
| "title": "test_sys_binlog:test_pause_resume", |
| "describe": "创建binlog load任务,验证暂停任务后重启任务成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #pause sync job |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #resume sync job |
| ret = client.resume_sync_job(job_name) |
| assertStop(ret, client, job_name, 'resume sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7] |
| check_data(expected_file_list, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_create_same_job(): |
| """ |
| { |
| "title": "test_sys_binlog:test_create_same_job", |
| "describe": "验证创建相同的binlog load任务失败", |
| "tag": "p0,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| util.assert_return(False, 'already exists', client.create_sync_job, table_name, database_name, mysql_table_name, \ |
| mysql_database_name, job_name, canal_ip, destination=destination) |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_resume_after_create(): |
| """ |
| { |
| "title": "test_sys_binlog:test_resume_after_create", |
| "describe": "创建binlog load任务后,重启任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| util.assert_return(False, 'There is no paused job', client.resume_sync_job, job_name) |
| ret = client.get_sync_job_state(job_name) == 'RUNNING' |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_pause_same_job(): |
| """ |
| { |
| "title": "test_sys_binlog:test_pause_same_job", |
| "describe": "创建binlog load任务,验证暂停同一个任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| util.assert_return(False, "There is no running job", client.pause_sync_job, job_name) |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| sql = "select * from %s" % table_name |
| ret = (client.execute(sql) == ()) |
| assertStop(ret, client, job_name, 'query data error') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_create_after_pause(): |
| """ |
| { |
| "title": "test_sys_binlog:test_create_after_pause", |
| "describe": "创建binlog load任务,暂停任务后再次创建该任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| util.assert_return(False, "already exists", client.create_sync_job, table_name, database_name, mysql_table_name, \ |
| mysql_database_name, job_name, canal_ip) |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| sql = "select * from %s" % table_name |
| ret = (client.execute(sql) == ()) |
| assertStop(ret, client, job_name, 'query data error') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_resume_same_job(): |
| """ |
| { |
| "title": "test_sys_binlog:test_resume_same_job", |
| "describe": "创建binlog load任务,暂停任务,重启任务,验证再次重启失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.resume_sync_job(job_name) |
| assertStop(ret, client, job_name, 'resume sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| util.assert_return(False, "There is no paused job", client.resume_sync_job, job_name) |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_create_after_resume(): |
| """ |
| { |
| "title": "test_sys_binlog:test_create_after_resume", |
| "describe": "创建binlog load任务,暂停任务,重启任务,验证再次创建该任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.resume_sync_job(job_name) |
| assertStop(ret, client, job_name, 'resume sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| util.assert_return(False, "already exists", client.create_sync_job, table_name, database_name, mysql_table_name, \ |
| mysql_database_name, job_name, canal_ip, destination=destination) |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_pause_and_resume(): |
| """ |
| { |
| "title": "test_sys_binlog:test_pause_and_resume", |
| "describe": "创建binlog load任务,验证多次暂停、重启任务成功", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| count = 10 |
| while count > 0: |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = client.resume_sync_job(job_name) |
| assertStop(ret, client, job_name, 'resume sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(1) |
| count -= 1 |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_sync_job_info(): |
| """ |
| { |
| "title": "test_sys_binlog:test_sync_job_info", |
| "describe": "创建binlog load任务,验证show sync job info", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| canal_port = '11111' |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| sync_job_list = client.show_sync_job() |
| sync_channel = '%s.%s->%s' % (mysql_database_name, mysql_table_name, table_name) |
| sync_job_config = 'address:%s:%s,destination:%s,batchSize:8192' % (canal_ip, canal_port, destination) |
| for sync_job in sync_job_list: |
| sync_job_info = palo_job.SyncJobInfo(sync_job) |
| if sync_job_info.get_job_name() == job_name: |
| ret = (sync_job_info.get_state() == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| ret = (sync_job_info.get_channel() == sync_channel) |
| assertStop(ret, client, job_name, 'sync channel error') |
| ret = (sync_job_info.get_job_config() == sync_job_config) |
| assertStop(ret, client, job_name, 'sync job config error') |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_tables_in_one_job(): |
| """ |
| { |
| "title": "test_sys_binlog:test_tables_in_one_job", |
| "describe": "创建单个binlog load任务,任务中包括多对表的对应关系,mysql执行导入语句,验证数据导入doris成功", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| mysql_database_name = 'm_' + database_name + port |
| table_names = [] |
| mysql_table_names = [] |
| mysql_database_names = [mysql_database_name for i in range(3)] |
| for i in ['_1', '_2', '_3']: |
| table_name_s = table_name + i |
| client.create_table(table_name_s, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| table_names.append(table_name_s) |
| mysql_table_name = 'mysql_' + table_name_s |
| if i == '_1': |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| else: |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, new_database=False) |
| mysql_table_names.append(mysql_table_name) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_names, database_name, mysql_table_names, mysql_database_names, job_name, \ |
| canal_ip, destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_names[0]) |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_names[1]) |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_names[2]) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| check_data(DATA.expected_file_3, table_names[0], job_name, 'data load fail', client) |
| check_data(DATA.expected_file_3, table_names[1], job_name, 'data load fail', client) |
| check_data(DATA.expected_file_1, table_names[2], job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_tables_in_jobs(): |
| """ |
| { |
| "title": "test_sys_binlog:test_tables_in_jobs", |
| "describe": "创建多个binlog load任务,任务中包含多对表的对应关系,mysql执行导入语句,验证数据导入doris成功", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| mysql_database_name = 'm_' + database_name + port |
| destination_list = [destination + '_1', destination + '_2', destination + '_3'] |
| count = 0 |
| job_name_list = [] |
| mysql_table_name_list = [] |
| table_name_list = [] |
| for i in ['_1', '_2', '_3']: |
| table_name_s = table_name + i |
| table_name_list.append(table_name_s) |
| client.create_table(table_name_s, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_table_name = 'mysql_' + table_name_s |
| if i == '_1': |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| else: |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1, new_database=False) |
| mysql_table_name_list.append(mysql_table_name) |
| job_name = 'job_' + table_name_s |
| job_name_list.append(job_name) |
| ret = client.create_sync_job(table_name_s, database_name, mysql_table_name, mysql_database_name, job_name, \ |
| canal_ip, destination=destination_list[count]) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| count += 1 |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name_list[0]) |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name_list[1]) |
| mysql_execute(sql) |
| sql = open(DATA.binlog_sql_1, 'r').read().format(mysql_table_name_list[2]) |
| mysql_execute(sql) |
| for i in range(3): |
| ret = (client.get_sync_job_state(job_name_list[i]) == 'RUNNING') |
| assertStop(ret, client, job_name_list[i], 'sync job state error') |
| check_data(DATA.expected_file_3, table_name_list[0], job_name_list[0], 'data load fail', client) |
| check_data(DATA.expected_file_3, table_name_list[1], job_name_list[1], 'data load fail', client) |
| check_data(DATA.expected_file_1, table_name_list[2], job_name_list[2], 'data load fail', client) |
| for i in range(3): |
| client.stop_sync_job(job_name_list[i]) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_stop_after_create(): |
| """ |
| { |
| "title": "test_sys_binlog:test_stop_after_create", |
| "describe": "创建binlog load任务,验证终止任务成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_stop_after_pause(): |
| """ |
| { |
| "title": "test_sys_binlog:test_stop_after_pause", |
| "describe": "创建binlog load任务,验证暂停任务后终止任务成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #pause sync job |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_stop_after_resume(): |
| """ |
| { |
| "title": "test_sys_binlog:test_stop_after_resume", |
| "describe": "创建binlog load任务,暂停任务,重启任务,验证终止任务成功", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #pause sync job |
| ret = client.pause_sync_job(job_name) |
| assertStop(ret, client, job_name, 'pause sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'PAUSED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #resume sync job |
| ret = client.resume_sync_job(job_name) |
| assertStop(ret, client, job_name, 'resume sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7] |
| check_data(expected_file_list, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_11, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(expected_file_list, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_stop_after_stop(): |
| """ |
| { |
| "title": "test_sys_binlog:test_stop_after_stop", |
| "describe": "创建binlog load任务,终止任务,验证再次终止任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #stop sync job again |
| util.assert_return(False, 'There is no uncompleted job', client.stop_sync_job, job_name) |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_create_same_job_after_stop(): |
| """ |
| { |
| "title": "test_sys_binlog:test_create_same_job_after_stop", |
| "describe": "创建binlog load任务,终止任务,验证创建不同名但表对应与之前相同的任务成功,数据能够继续导入", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #create same sync |
| job_name_2 = job_name + '_2' |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name_2, \ |
| canal_ip, destination=destination) |
| assertStop(ret, client, job_name_2, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name_2) == 'RUNNING') |
| assertStop(ret, client, job_name_2, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7] |
| check_data(expected_file_list, table_name, job_name_2, 'data load fail', client) |
| client.stop_sync_job(job_name_2) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_create_same_name_job_after_stop(): |
| """ |
| { |
| "title": "test_sys_binlog:test_create_same_name_job_after_stop", |
| "describe": "创建binlog load任务,终止任务,验证创建同名任务成功,执行mysql导入语句,验证doris数据导入正确", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #create same sync job |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, \ |
| canal_ip, destination=destination, is_wait=False) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| expected_file_list = [DATA.expected_file_5, DATA.expected_file_6, DATA.expected_file_7] |
| check_data(expected_file_list, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_pause_after_stop(): |
| """ |
| { |
| "title": "test_sys_binlog:test_pause_after_stop", |
| "describe": "创建binlog load任务,终止任务,验证暂停任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #pause sync job |
| util.assert_return(False, 'There is no running job', client.pause_sync_job, job_name) |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_resume_after_stop(): |
| """ |
| { |
| "title": "test_sys_binlog:test_resume_after_stop", |
| "describe": "创建binlog load任务,终止任务,验证重启任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_6, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_5, table_name, job_name, 'data load fail', client) |
| #stop sync job |
| ret = client.stop_sync_job(job_name) |
| assertStop(ret, client, job_name, 'stop sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_7, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| #resume sync job |
| util.assert_return(False, 'There is no paused job', client.resume_sync_job, job_name) |
| ret = (client.get_sync_job_state(job_name) == 'CANCELLED') |
| assertStop(ret, client, job_name, 'sync job state error') |
| sql = open(DATA.binlog_sql_8, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| time.sleep(WAIT_TIME) |
| ret = client.verify(DATA.expected_file_5, table_name) |
| assertStop(ret, client, job_name, 'data load error') |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_column(): |
| """ |
| { |
| "title": "test_sys_binlog:test_column", |
| "describe": "创建binlog load任务,指定列映射,mysql执行导入命令,验证doris数据导入", |
| "tag": "p0,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_5, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| columns = ['k1', 'k6', 'k3', 'k4', 'k2', 'k7', 'k5'] |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| columns=columns, destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_10, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_add_nullable_column(): |
| """ |
| { |
| "title": "test_sys_binlog:test_add_nullable_column", |
| "describe": "创建binlog load任务,指定列映射,doris增加nullable列,mysql执行导入命令,验证数据导入到doris", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_5, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| columns = ['k1', 'k6', 'k3', 'k4', 'k2', 'k7', 'k5'] |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| columns=columns, destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| time.sleep(WAIT_TIME) |
| client.schema_change(table_name, add_column_list=["k8 int after k1"], is_wait=True) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_11, table_name, job_name, 'data load fail', client) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_same_destination(): |
| """ |
| { |
| "title": "test_sys_binlog:test_same_destination", |
| "describe": "canal中每一个的destination仅对应一个binlog load任务,重复使用destination,验证创建binlog load任务失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client.clean(database_name) |
| client.create_database(database_name) |
| client.use(database_name) |
| client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, \ |
| destination=destination) |
| assertStop(ret, client, job_name, 'create sync job failed') |
| ret = (client.get_sync_job_state(job_name) == 'RUNNING') |
| assertStop(ret, client, job_name, 'sync job state error') |
| #使用重复的destination创建binlog load任务失败 |
| table_name_s = table_name + '_s' |
| mysql_table_name_s = mysql_table_name + '_s' |
| client.create_table(table_name_s, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| create_mysql_table(mysql_table_name_s, mysql_database_name, DATA.column_1, new_database=False) |
| job_name_s = job_name + '_s' |
| util.assert_return(False, 'conflict destination', client.create_sync_job, table_name_s, database_name, \ |
| mysql_table_name_s, mysql_database_name, job_name_s, canal_ip, destination=destination) |
| client.stop_sync_job(job_name) |
| client.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_observer_fe(): |
| """ |
| { |
| "title": "test_sys_binlog:test_observer_fe", |
| "describe": "连接observer fe,创建binlog load 任务,mysql执行导入语句,验证doris导入数据", |
| "tag": "p1,system" |
| } |
| """ |
| client_observer = palo_client.get_client(config.fe_observer_list[0], config.fe_query_port, user=config.fe_user, \ |
| password=config.fe_password, http_port=config.fe_http_port) |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client_observer.clean(database_name) |
| client_observer.create_database(database_name) |
| client_observer.use(database_name) |
| client_observer.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client_observer.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, \ |
| canal_ip, destination=destination) |
| assertStop(ret, client_observer, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client_observer) |
| client_observer.stop_sync_job(job_name) |
| client_observer.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def test_follower_fe(): |
| """ |
| { |
| "title": "test_sys_binlog:test_follower_fe", |
| "describe": "连接follower fe,创建binlog load 任务,mysql执行导入语句,验证doris导入数据", |
| "tag": "p1,system" |
| } |
| """ |
| client_follower = palo_client.get_client(config.fe_follower_list[0], config.fe_query_port, user=config.fe_user, \ |
| password=config.fe_password, http_port=config.fe_http_port) |
| database_name, table_name, index_name = util.gen_num_format_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client_follower.clean(database_name) |
| client_follower.create_database(database_name) |
| client_follower.use(database_name) |
| client_follower.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)') |
| mysql_database_name = 'm_' + database_name + port |
| mysql_table_name = 'mysql_' + table_name |
| create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1) |
| job_name = 'job_' + table_name |
| ret = client_follower.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name, job_name, \ |
| canal_ip, destination=destination) |
| assertStop(ret, client_follower, job_name, 'create sync job failed') |
| time.sleep(WAIT_TIME) |
| sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name) |
| mysql_execute(sql) |
| check_data(DATA.expected_file_3, table_name, job_name, 'data load fail', client_follower) |
| client_follower.stop_sync_job(job_name) |
| client_follower.clean(database_name) |
| mysql_clean(mysql_database_name) |
| |
| |
| def teardown_module(): |
| """ |
| tearDown |
| """ |
| connect.close() |