| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| 一个Palo的Task, |
| 用于测试两类任务的相互影响时,持续执行一类任务 |
| Date: 2015/01/22 10:49:31 |
| """ |
| import random |
| import time |
| import threading |
| from lib import palo_client |
| |
| |
| class PaloTask(object): |
| """ |
| 所有Task的父类 |
| """ |
| def __init__(self, client): |
| self.client = client |
| |
| def do_task(self): |
| """ |
| 执行task, 在子类中实现 |
| """ |
| pass |
| |
| def wait_task(self): |
| """ |
| 等待task执行结束,用于异步任务的状态监控, 需在子类中实现 |
| """ |
| pass |
| |
| def clean(self): |
| """清理task中的残留""" |
| pass |
| |
| |
| class TaskThread(threading.Thread): |
| """ |
| 启动一个线程循环去执行task |
| """ |
| def __init__(self, task): |
| self._exit_event = threading.Event() |
| self.task = task |
| threading.Thread.__init__(self) |
| self.setDaemon(True) |
| |
| def stop(self): |
| """ |
| 设置结束标记,会结束所有对象线程 |
| """ |
| self._exit_event.set() |
| |
| def run(self): |
| """ |
| 启动线程 |
| """ |
| while not self._exit_event.is_set(): |
| self.task.wait_task() |
| self.task.do_task() |
| |
| |
| class SelectTask(PaloTask): |
| """ |
| 查询任务 |
| """ |
| def __init__(self, host, port, sql, database_name=None, expected_file_path=None, |
| user="root", password="", charset="utf8", delay=None, interval=None): |
| self.client = palo_client.get_client(host, port, database_name=database_name, user=user, |
| password=password, charset=charset) |
| self.sql = sql |
| if database_name is not None: |
| self.client.use(database_name) |
| self.expected_file_path = expected_file_path |
| self.delay = delay |
| if interval is None: |
| self.interval = 1 |
| else: |
| self.interval = interval |
| |
| def do_task(self): |
| """ |
| 发送查询 |
| """ |
| result = None |
| if self.delay is not None: |
| try_time = 0 |
| while try_time < self.delay: |
| try: |
| result = self.client.execute(self.sql) |
| except: |
| try_time += self.interval |
| time.sleep(self.interval) |
| else: |
| break |
| else: |
| result = self.client.execute(self.sql) |
| |
| if self.expected_file_path: |
| pass |
| return result |
| |
| |
| class BatchLoadTask(PaloTask): |
| """ |
| 连续提交导入任务 |
| """ |
| def __init__(self, host, port, database_name, load_label, load_data_list, |
| max_filter_ratio=None, timeout=None, is_wait=False, interval=None, |
| user="root", password="", charset="utf8", broker=None): |
| self.client = palo_client.get_client(host, port, database_name=database_name, user=user, |
| password=password, charset=charset) |
| self.client.use(database_name) |
| self.load_label = load_label |
| self.load_num = 0 |
| self.load_data_list = load_data_list |
| self.max_filter_ratio = max_filter_ratio |
| self.timeout = timeout |
| self.is_wait = is_wait |
| self.broker = broker |
| if interval is None: |
| self.interval = 0 |
| else: |
| self.interval = interval |
| |
| def do_task(self): |
| """ |
| 做导入任务 |
| """ |
| load_label = "%s_%d" % (self.load_label, self.load_num) |
| ret = self.client.batch_load(load_label, self.load_data_list, |
| max_filter_ratio=self.max_filter_ratio, |
| timeout=self.timeout, is_wait=self.is_wait, |
| broker=self.broker) |
| assert ret |
| self.load_num += 1 |
| time.sleep(self.interval) |
| |
| |
| class BulkLoadTask(PaloTask): |
| """ |
| 连续提交小批量导入任务 |
| """ |
| def __init__(self, host, port, be_host, webserver_port, database_name, table_family_name, |
| load_label, data_file, max_filter_ratio=None, timeout=None, is_wait=False, |
| user="root", password="", be_user="root", be_password="", charset="utf8", interval=0): |
| self.client = palo_client.get_client(host, port, database_name=database_name, user=user, |
| password=password, charset=charset) |
| self.be_host = be_host |
| self.webserver_port = webserver_port |
| self.database_name = database_name |
| self.table_family_name = table_family_name |
| self.load_label = load_label |
| self.load_num = 1 |
| self.data_file = data_file |
| self.max_filter_ratio = max_filter_ratio |
| self.timeout = timeout |
| self.is_wait = is_wait |
| self.be_user = be_user |
| self.be_password = be_password |
| self.interval = interval |
| |
| def do_task(self): |
| """ |
| 做小批量导入任务 |
| """ |
| load_label = "%s_%d" % (self.load_label, self.load_num) |
| ret = self.client.bulk_load(self.table_family_name, load_label, self.data_file, |
| self.max_filter_ratio, self.timeout, self.database_name, self.be_host, |
| self.webserver_port, self.is_wait, user=self.be_user, password=self.be_password) |
| assert ret |
| self.load_num += 1 |
| time.sleep(self.interval) |
| |
| |
| class RollupTask(PaloTask): |
| """ |
| 连续提交 |
| """ |
| def __init__(self, host, port, database_name, table_family_name, rollup_table_name, |
| rollup_column_name_list, user="root", password="", charset="utf8", **kwargs): |
| self.client = palo_client.get_client(host, port, database_name=database_name, user=user, |
| password=password, charset=charset) |
| self.client.use(database_name) |
| self.table_family_name = table_family_name |
| self.rollup_table_name = rollup_table_name |
| self.rollup_num = 1 |
| self.rollup_column_name_list = rollup_column_name_list |
| self.kwargs = kwargs |
| |
| def do_task(self): |
| """ |
| 做rollup |
| """ |
| rollup_table_name = "%s_%d" % (self.rollup_table_name, self.rollup_num) |
| self.rollup_num += 1 |
| self.client.create_rollup_table(self.table_family_name, rollup_table_name, |
| self.rollup_column_name_list, is_wait=True) |
| |
| |
| class DeleteTask(PaloTask): |
| """ |
| 数据删除任务, 循环使用delete_conditions_list中的删除条件,向palo发送数据删除命令 |
| """ |
| def __init__(self, host, port, database_name, table_family_name, delete_conditions_list, |
| user="root", password="", charset="utf8", **kwargs): |
| """ |
| Parameters: |
| delete_conditions_list:由delete_condition_list(见PaloClient.delete)组成的list |
| """ |
| self.client = palo_client.get_client(host, port, database_name=database_name, user=user, |
| password=password, charset=charset) |
| self.client.use(database_name) |
| self.table_family_name = table_family_name |
| self.delete_conditions_list = delete_conditions_list |
| self.delete_conditions_index = 0 |
| self.kwargs = kwargs |
| |
| def do_task(self): |
| """ |
| 执行一次数据删除,delete_conditions_index递增 |
| """ |
| delete_condition_list = self.delete_conditions_list[self.delete_conditions_index % \ |
| len(self.delete_conditions_list)] |
| self.delete_conditions_index += 1 |
| try: |
| ret = self.client.delete(self.table_family_name, delete_condition_list, **self.kwargs) |
| except palo_client.PaloClientException as error: |
| print(str(error)) |
| #TODO |
| pass |
| |
| |
| class SyncTask(PaloTask): |
| """ |
| 执行同步任务,如查询,同步的insert、delete、update等 |
| todo: 每个任务的参数相同,每个任务的结果不校验 |
| """ |
| def __init__(self, func, *args, **kwargs): |
| self.func = func |
| self.args = args |
| self.kwargs = kwargs |
| self.succ_count = 0 |
| self.error_count = 0 |
| self.interval = None |
| |
| def do_task(self): |
| """ |
| 执行一次任务 |
| """ |
| try: |
| self.func(*self.args, **self.kwargs) |
| self.succ_count += 1 |
| except Exception as e: |
| self.error_count += 1 |
| print(str(e)) |
| if self.interval is None: |
| time.sleep(random.randint(0, 10) / 10.0) |
| else: |
| time.sleep(self.interval) |
| |
| |
| |