| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| ########################################################################### |
| # |
| # @file palo_client.py |
| # @date 2015/02/04 15:26:21 |
| # @brief Palo client |
| # |
| ############################################################################ |
| |
| """ |
| Palo client for Palo2 testing. |
| """ |
| # 路径设置 |
| import sys |
| import os |
| |
| # 系统库 |
| import time |
| import logging |
| import socket |
| import pycurl |
| from io import BytesIO |
| import pytest |
| import json |
| import pymysql |
| |
| # local库 |
| import util |
| import palo_logger |
| import palo_job |
| from palo_config import BrokerInfo |
| from palo_exception import PaloException |
| from palo_exception import PaloClientException |
| from palo_sql import * |
| from palo_verify import verify, verify_by_sql |
| |
| |
| # 日志 异常 对象 |
| LOG = palo_logger.Logger.getLogger() |
| L = palo_logger.StructedLogMessage |
| |
| |
| def get_client(host_name, port, database_name=None, user='root', password='', |
| charset='utf8', retry=True, http_port=None): |
| """get client |
| """ |
| client = PaloClient(host_name, port, database_name, user, password, charset, http_port) |
| if client.init(retry): |
| return client |
| else: |
| raise PaloClientException('get client error: host:%s port:%s database:%s ' |
| 'user:%s password:%s' % (host_name, port, database_name, user, password)) |
| |
| |
| class PaloClient(object): |
| """Palo client. |
| 成员变量有: host port user password database_name charset connection |
| """ |
| |
| def __init__(self, host, port, database_name=None, user='root', password='', charset='utf8', |
| http_port=None): |
| """ |
| Connect to Palo FE. |
| """ |
| # TODO 成员变量私有化 |
| self.host = host |
| self.port = port |
| self.user = user |
| self.password = password |
| self.database_name = database_name |
| self.charset = charset |
| self.connection = None |
| self.cluster_name = 'default_cluster' |
| if http_port is None: |
| self.http_port = port - 1000 |
| else: |
| self.http_port = http_port |
| self.palo_job_map = {} |
| |
| def init(self, retry=True): |
| """init |
| """ |
| assert self.host |
| retry_times = 0 |
| while retry_times < 10: |
| try: |
| self.connection = pymysql.connect( |
| host=self.host, |
| port=self.port, |
| user=self.user, |
| passwd=self.password, |
| charset=self.charset) |
| break |
| except Exception as e: |
| LOG.error(L('Connect to Palo error', fe=str(self), error=e)) |
| if not retry: |
| return False |
| retry_times += 1 |
| time.sleep(1) |
| else: |
| LOG.error(L('Connect to Palo error', retry_times=retry_times)) |
| return False |
| |
| LOG.info(L("Connected to Palo", host=self.host, port=self.port, user=self.user)) |
| database_name = self.database_name |
| if database_name is None: |
| return True |
| |
| database_name_list = self.get_database_list() |
| if database_name not in database_name_list: |
| if not self.create_database(database_name): |
| return False |
| self.use(database_name) |
| |
| return True |
| |
| def __str__(self): |
| return "[host: %s, port: %d, database: %s]" % (self.host, self.port, self.database_name) |
| |
| def __del__(self): |
| if self.connection: |
| self.connection.close() |
| |
| def __execute_and_rebuild_meta_class(self, sql, palo_job_cls): |
| if palo_job_cls.__name__ in self.palo_job_map.keys(): |
| result = self.execute(sql) |
| else: |
| cursor, result = self.execute(sql, return_cursor=True) |
| description = cursor.description |
| idx = 0 |
| for col_info in description: |
| setattr(palo_job_cls, col_info[0], idx) |
| idx += 1 |
| self.palo_job_map[palo_job_cls.__name__] = True |
| return result |
| |
| def use(self, database_name): |
| """ |
| specify default database. |
| """ |
| try: |
| self.database_name = database_name |
| self.connection.select_db(database_name) |
| except Exception as error: |
| LOG.error(L("USE database error", fe=str(self), database_name=database_name, \ |
| error=error)) |
| return False |
| return True |
| |
| def connect(self): |
| """ |
| 建立连接 |
| """ |
| try: |
| self.connection = pymysql.connect( |
| host=self.host, |
| port=self.port, |
| user=self.user, |
| passwd=self.password, |
| charset=self.charset) |
| except: |
| LOG.info(L("Re-connected to Palo FE fail.", |
| host=self.host, port=self.port, user=self.user)) |
| return False |
| LOG.info(L("Re-connected to Palo FE success.", |
| host=self.host, port=self.port, user=self.user)) |
| return True |
| |
| def execute(self, sql, return_rows=False, return_cursor=False): |
| """ |
| 执行sql语句 |
| """ |
| if 'PALO_CLIENT_LOG_SQL' in os.environ.keys(): |
| LOG.info(L("execute SQL", sql=sql)) |
| if 'PALO_CLIENT_STDOUT' in os.environ.keys(): |
| # if sql.upper().startswith('SHOW') or sql.upper().startswith('SELECT'): |
| if sql.upper().startswith('SHOW PROC'): |
| pass |
| else: |
| print(sql.strip(';') + ';') |
| # avoid: table state is not normal, do not allow doing ALTER ops. |
| if sql.upper().startswith('ALTER'): |
| LOG.info(L("before alter sleep 0.5s")) |
| time.sleep(0.5) |
| cursor = self.connection.cursor() |
| try: |
| rows = cursor.execute(sql) |
| except pymysql.err.InternalError: |
| self.connection.ping(reconnect=True) |
| rows = cursor.execute(sql) |
| if not return_rows and not return_cursor: |
| return cursor.fetchall() |
| elif return_rows: |
| return rows, cursor.fetchall() |
| elif return_cursor: |
| return cursor, cursor.fetchall() |
| else: |
| return cursor.fetchall() |
| |
| def verify(self, expected_file_list, table_name, database_name=None, |
| save_file_list=None, encoding='utf8', cluster_name=None, key_desc='aggregate'): |
| """ |
| verify |
| """ |
| LOG.info(L("check file:", file=expected_file_list)) |
| database_name = self.database_name if database_name is None else database_name |
| # 获取的表的schema,desc |
| schema = self.desc_table(table_name, database_name) |
| data = self.select_all(table_name, database_name) |
| return verify(expected_file_list, data, schema, table_name, database_name, encoding, save_file_list) |
| |
| def verify_by_sql(self, expected_file_list, sql, schema, save_file_list=None): |
| """verify by sql""" |
| LOG.info(L("check file:", file=expected_file_list)) |
| # 获取的表的schema,desc |
| data = self.execute(sql) |
| return verify_by_sql(expected_file_list, data, schema, 'check_by_sql', self.database_name, None, save_file_list) |
| |
| def create_database(self, database_name, cluster_name=None): |
| """ |
| 创建database |
| """ |
| sql = "CREATE DATABASE %s" % database_name |
| self.execute(sql) |
| if not cluster_name: |
| cluster_name = self.cluster_name |
| database_name_list = self.get_database_list(cluster_name) |
| self.use(database_name) |
| if database_name in database_name_list: |
| self.database_name = database_name |
| LOG.info(L("CREATE DATABASE succ", database_name=database_name, fe=str(self))) |
| return True |
| else: |
| LOG.warning(L("CREATE DATABASE fail", database_name=database_name, fe=str(self))) |
| return False |
| |
| def create_table(self, table_name, column_list, |
| partition_info=None, distribution_info=None, storage_type=None, |
| storage_medium=None, storage_cooldown_time=None, bloom_filter_column_list=None, |
| replication_num=None, database_name=None, set_null=False, keys_desc=None, |
| bitmap_index_list=None, dynamic_partition_info=None, replication_allocation=None, |
| light_schema_change=None, enable_unique_key_merge_on_write=None): |
| """ |
| Create table |
| Attributes: |
| column_list: 由4元组(name, type, agg_type, default_value)组成的list, 后两项可省略 |
| 需要注意的是key列指定默认值是agg_type设置为None |
| Example: [("k1", "int"), ("k2", "char", None, ""), ("v", "date", "replace")] |
| bitmap_index_list: 由3元组(index_name, column_name, index_type)组成的list |
| Example:[("k1_index", "k1", "BITMAP")] |
| partition_info: PartitionInfo对象 |
| keys_desc: "AGGREGATE KEYS(k1)" |
| dynamic_partition_info:创建动态分区用到的参数 |
| replication_allocation:设置副本的资源组 |
| enable_unique_key_merge_on_write:unique表是否开启merge-on-write数据更新模式,默认开启 |
| deprecated: |
| storage_type: |
| distribution: |
| added: |
| 3种数据模型 aggregate,duplicate,unique |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| # table name |
| sql = 'CREATE TABLE %s.%s (' % (database_name, table_name) |
| # columns |
| key_columns = list() |
| aggregate_flag = False |
| for column in column_list: |
| sql = '%s %s,' % (sql, util.column_to_sql(column, set_null)) |
| if len(column) == 2 or column[2] is None: |
| key_columns.append(column[0]) |
| if len(column) > 2 and column[2] is not None and column[2].upper() in \ |
| ['MAX', 'MIN', 'SUM', 'REPLACE', 'HLL_UNION', 'REPLACE_IF_NOT_NULL']: |
| aggregate_flag = True |
| if bitmap_index_list is not None: |
| for bitmap_index in bitmap_index_list: |
| sql = '%s %s,' % (sql, util.bitmap_index_to_sql(bitmap_index)) |
| sql = '%s )' % sql.rstrip(',') |
| if keys_desc is None: |
| if aggregate_flag: |
| keys_desc = 'AGGREGATE KEY(%s)' % ','.join(key_columns) |
| else: |
| keys_desc = '' |
| sql = '%s %s' % (sql, keys_desc) |
| # partition |
| if partition_info: |
| sql = '%s %s' % (sql, str(partition_info)) |
| # distribution |
| if distribution_info is None: |
| distribution_info = DistributionInfo('HASH(%s)' % column_list[0][0], 5) |
| elif isinstance(distribution_info, DistributionInfo) and \ |
| distribution_info.distribution_type.upper() == 'RANDOM': |
| distribution_info.distribution_type = 'HASH(%s)' % column_list[0][0] |
| elif isinstance(distribution_info, str): |
| distribution_info.replace(' RANDOM', 'HASH(%s)' % column_list[0][0]) |
| distribution_info.replace(' random', 'HASH(%s)' % column_list[0][0]) |
| else: |
| pass |
| sql = '%s %s' % (sql, str(distribution_info)) |
| # properties |
| sql = '%s PROPERTIES (' % (sql) |
| if storage_medium is not None: |
| sql = '%s "storage_medium"="%s",' % (sql, storage_medium) |
| if storage_cooldown_time is not None: |
| sql = '%s "storage_cooldown_time"="%s",' % (sql, storage_cooldown_time) |
| if bloom_filter_column_list is not None: |
| sql = '%s "bloom_filter_columns"="%s",' % (sql, ",".join(bloom_filter_column_list)) |
| if replication_num is not None: |
| sql = '%s "replication_num"="%s",' % (sql, replication_num) |
| if dynamic_partition_info is not None: |
| sql = '%s %s' % (sql, str(dynamic_partition_info)) |
| if replication_allocation is not None: |
| sql = '%s "replication_allocation"="%s"' % (sql, replication_allocation) |
| if light_schema_change is not None: |
| sql = '%s "light_schema_change"="%s"' % (sql, light_schema_change) |
| if enable_unique_key_merge_on_write is not None: |
| sql = '%s "enable_unique_key_merge_on_write"="%s"' % (sql, enable_unique_key_merge_on_write) |
| if sql.endswith(' PROPERTIES ('): |
| sql = sql.rstrip(' PROPERTIES (') |
| else: |
| sql = '%s )' % (sql.rstrip(',')) |
| try: |
| ret = self.execute(sql) |
| except Exception as e: |
| LOG.warning(L('CREATE TABLE fail.', table_name=table_name, msg=str(e))) |
| if "Failed to find" in str(e) or \ |
| "replication num should be less than the number of available backends" in str(e): |
| alive_be = self.get_alive_backend_list() |
| if len(alive_be) < 3: |
| LOG.warning(L('SKIP: some backends are dead, create table failed, skip test case ')) |
| raise pytest.skip("some backends are dead, create table failed, skip test case") |
| else: |
| raise e |
| else: |
| raise e |
| if ret == (): |
| LOG.info(L('CREATE TABLE succ.', database_name=database_name, |
| table_name=table_name)) |
| return True |
| else: |
| LOG.info(L('CREATE TABLE fail.', database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| def create_table_like(self, table_name, source_table_name, database_name=None, |
| source_database_name=None, rollup_list=None, external=False, |
| if_not_exists=False): |
| """help create table like see more info""" |
| sql = 'CREATE {external}TABLE {if_not_exists}{database_name}{table_name} LIKE ' \ |
| '{source_database_name}{source_table_name}{with_rollup}' |
| external = 'EXTERNAL' if external else '' |
| if_not_exists = 'IF NOT EXISTS ' if if_not_exists else '' |
| database_name = database_name + '.' if database_name else '' |
| with_rollup = ' ROLLUP (%s)' % ','.join(rollup_list) if rollup_list else '' |
| source_database_name = source_database_name + '.' if source_database_name else '' |
| sql = sql.format(external=external, if_not_exists=if_not_exists, database_name=database_name, |
| table_name=table_name, source_database_name=source_database_name, |
| source_table_name=source_table_name, with_rollup=with_rollup) |
| ret = self.execute(sql) |
| return ret == () |
| |
| def create_rollup_table(self, table_name, rollup_table_name, column_name_list, |
| storage_type=None, database_name=None, base_index_name=None, |
| is_wait=False, force_alter=False, cluster_name=None, after_column_name_list=''): |
| """ |
| Create a rollup table |
| todo:storage_type |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| sql = 'ALTER TABLE %s.%s ADD ROLLUP %s (%s) %s' % (database_name, table_name, |
| rollup_table_name, ','.join(column_name_list), |
| after_column_name_list) |
| if base_index_name: |
| sql = "%s FROM %s" % (sql, base_index_name) |
| if storage_type or force_alter: |
| sql = '%s PROPERTIES(' % sql |
| if storage_type: |
| sql = '%s "storage_type"="column",' % sql |
| if force_alter: |
| sql = '%s "force_alter"="true",' % sql |
| sql = '%s)' % sql.rstrip(',') |
| |
| ret = self.execute(sql) |
| |
| if ret != (): |
| LOG.info(L("CREATE ROLLUP TABLE fail.", database_name=database_name, |
| table_name=table_name, |
| rollup_table_name=rollup_table_name)) |
| return False |
| ret = True |
| if is_wait: |
| ret = self.wait_table_rollup_job(table_name, cluster_name=cluster_name, |
| database_name=database_name) |
| return ret |
| LOG.info(L("CREATE ROLLUP TABLE succ.", database_name=database_name, |
| table_name=table_name, |
| rollup_table_name=rollup_table_name)) |
| return ret |
| |
| def cancel_rollup(self, table_name, database_name=None): |
| """ |
| 取消rollup |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| sql = 'CANCEL ALTER TABLE ROLLUP FROM %s.%s' % (database_name, table_name) |
| ret = self.execute(sql) |
| if ret == (): |
| LOG.info(L("CANCEL ALTER ROLLUP succ.", database_name=database_name, |
| table_name=table_name)) |
| return True |
| else: |
| LOG.info(L("CANCEL ALTER ROLLUP fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| def create_materialized_view(self, table_name, materialized_view_name, view_sql, |
| database_name=None, is_wait=False): |
| """ |
| create_materialized_view |
| |
| :param table_name: |
| :param materialized_view_name: |
| :param view_sql: |
| :param database_name: |
| :param is_wait: |
| :return: |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| sql = 'CREATE MATERIALIZED VIEW %s AS %s' % (materialized_view_name, view_sql) |
| # before do it sleep 1s |
| time.sleep(1) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L("CREATE MATERIALIZED VIEW fail.", database_name=database_name, |
| materialized_view_name=materialized_view_name)) |
| return False |
| ret = True |
| if is_wait: |
| ret = self.wait_table_rollup_job(table_name, database_name=database_name) |
| return ret |
| LOG.info(L("CREATE MATERIALIZED VIEW succ.", database_name=database_name, |
| table_name=table_name, materialized_view_name=materialized_view_name, )) |
| return ret |
| |
| def drop_materialized_view(self, database_name, table_name, view_name): |
| """ |
| drop_materialized_view |
| 目前没有delete功能,rd开发中,采用alter table的方式删除 |
| :param database_name: |
| :param table_name: |
| :param view_name: |
| :return: |
| """ |
| sql = 'DROP MATERIALIZED VIEW IF EXISTS %s ON %s.%s' % (view_name, database_name, table_name) |
| ret = self.execute(sql) |
| return ret |
| |
| def get_index_list(self, table_name, database_name=None): |
| """ |
| 获取table的所有index |
| """ |
| if not table_name: |
| return None |
| ret = self.desc_table(table_name, database_name, is_all=True) |
| idx_list = util.get_attr(ret, palo_job.DescInfoAll.IndexName) |
| while '' in idx_list: |
| idx_list.remove('') |
| return idx_list |
| |
| def get_index(self, table_name, index_name=None, database_name=None): |
| """ |
| 获取table的指定名字的Index,如无指定,返回默认创建的Index info |
| """ |
| if not table_name: |
| return None |
| if index_name is None: |
| index_name = table_name |
| idx_list = self.get_index_list(table_name, database_name) |
| if index_name in idx_list: |
| return index_name |
| else: |
| LOG.info(L('can not get index from table', index_name=index_name, table_name=table_name)) |
| return None |
| |
| def get_index_schema(self, table_name, index_name=None, database_name=None): |
| """ |
| Get table schema |
| """ |
| if index_name is None: |
| return self.desc_table(table_name, database_name, is_all=False) |
| ret = self.desc_table(table_name, database_name, is_all=True) |
| record = False |
| return_ret = list() |
| for item in ret: |
| desc_column = palo_job.DescInfoAll(item) |
| if record is False and desc_column.get_index_name() != index_name: |
| continue |
| elif desc_column.get_index_name() == index_name: |
| record = True |
| return_ret.append((desc_column.get_field(), desc_column.get_type(), desc_column.get_null(), |
| desc_column.get_key(), desc_column.get_default(), desc_column.get_extra())) |
| elif record is True: |
| return_ret.append((desc_column.get_field(), desc_column.get_type(), desc_column.get_null(), |
| desc_column.get_key(), desc_column.get_default(), desc_column.get_extra())) |
| else: |
| LOG.info(L('get index schema error')) |
| return tuple(return_ret) |
| |
| def set_time_zone(self, zone, global_var=False): |
| """设置系统的时区 |
| zone:要设置的时区,是否要global参数 |
| """ |
| if global_var: |
| sql = "set global time_zone = '%s'" % zone |
| else: |
| sql = "set time_zone = '%s'" % zone |
| LOG.info(L('palo sql', sql=sql)) |
| palo_res = self.execute(sql) |
| |
| def set_sql_mode(self, sql_mode=None): |
| """set_sql_mode""" |
| sql_mode = "PIPES_AS_CONCAT" if sql_mode is None else sql_mode |
| sql = "set sql_mode = %s" % sql_mode |
| LOG.info(L("palo sql", sql=sql)) |
| res = self.execute(sql) |
| return res |
| |
| def get_sql_mode(self): |
| """get sql mode""" |
| sql = "select @@sql_mode" |
| LOG.info(L("palo sql", sql=sql)) |
| p_res = self.execute(sql) |
| return p_res |
| |
| def wait_load_job(self, load_label, database_name=None, cluster_name=None, timeout=1800): |
| """ |
| wait load job |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| LOG.debug(L("wait load job.", load_label=load_label, database_name=database_name)) |
| retry_times = 0 |
| sql = 'SHOW LOAD FROM %s WHERE LABEL="%s"' % (database_name, load_label) |
| while retry_times < 10 and timeout > 0: |
| time.sleep(1) |
| timeout -= 1 |
| load_job = self.execute(sql) |
| if len(load_job) == 0: |
| retry_times += 1 |
| continue |
| state = palo_job.LoadJob(load_job[-1]).get_state() |
| if state.upper() != "FINISHED" and state.upper() != "CANCELLED": |
| continue |
| elif state.upper() == "FINISHED": |
| return True |
| else: |
| LOG.info(L("LOAD FAILED.", database_name=database_name, |
| msg=palo_job.LoadJob(load_job[-1]).get_errormsg(), |
| url=palo_job.LoadJob(load_job[-1]).get_url())) |
| return False |
| else: |
| LOG.info(L("LOAD LABEL NOT EXIST", load_label=load_label, database_name=database_name)) |
| return False |
| |
| def bulk_load(self, table_name, load_label, data_file, max_filter_ratio=None, |
| column_name_list=None, timeout=None, database_name=None, host=None, port=None, |
| user=None, password=None, is_wait=False, cluster_name=None, |
| hll_column_list=None, column_separator=None, backend_id=None): |
| """ |
| |
| Args: |
| table_name: string, table name |
| load_label: string, load label |
| data_file: string, 导入的文件 |
| max_filter_ratio: float, max_filter_ratio |
| column_name_list: list, column name list |
| timeout: int, timeout |
| database_name: string, database name |
| host: string, fe host |
| port: int, fe http port |
| user: string, fe user |
| password: string, fe password |
| is_wait: True/False, is wait |
| cluster_name: string, cluster name |
| hll_column_list: list, like ['hll_column1,k1', 'hll_column,k2'] |
| backend_id兼容以前的代码 |
| Returns: |
| True: mini load success |
| False: mini load Fail |
| |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| host = self.host if host is None else host |
| port = self.http_port if port is None else port |
| user = self.user if user is None else user |
| password = self.password if password is None else password |
| url = 'http://%s:%s/api/%s/%s/_load?label=%s' % (host, port, database_name, |
| table_name, load_label) |
| if max_filter_ratio: |
| url = '%s&max_filter_ratio=%s' % (url, max_filter_ratio) |
| if column_name_list: |
| url = '%s&columns=%s' % (url, ','.join(column_name_list)) |
| if hll_column_list: |
| url = '%s&hll=%s' % (url, ':'.join(hll_column_list)) |
| if timeout: |
| url = '%s&timeout=%s' % (url, timeout) |
| if column_separator: |
| url = '%s&column_separator=%s' % (url, column_separator) |
| cmd = 'curl --location-trusted -u %s:%s -T %s %s' % (user, password, data_file, url) |
| print(cmd) |
| LOG.info(L('bulk multi load', cmd=cmd)) |
| file = open(data_file) |
| c = pycurl.Curl() |
| buf = BytesIO() |
| c.setopt(c.URL, url) |
| c.setopt(pycurl.WRITEFUNCTION, buf.write) |
| # basic认证 |
| c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) |
| c.setopt(pycurl.USERNAME, user) |
| c.setopt(pycurl.PASSWORD, password) |
| # 上传文件 |
| c.setopt(pycurl.UPLOAD, 1) |
| c.setopt(pycurl.READDATA, file) |
| # 重定向, --location-trusted |
| c.setopt(pycurl.UNRESTRICTED_AUTH, 1) |
| c.setopt(pycurl.FOLLOWLOCATION, True) |
| LOG.info(L("BULK LOAD.", url=url, file_name=data_file)) |
| msg = None |
| try: |
| c.perform() |
| msg = buf.getvalue() |
| LOG.info(L("BULK LOAD ret.", ret=msg)) |
| print(c.getinfo(c.HTTP_CODE)) |
| # if c.getinfo(c.HTTP_CODE) == 200: |
| if c.getinfo(c.HTTP_CODE): |
| ret = json.loads(msg) |
| status = ret.get('status') |
| if status == 'Success': |
| if is_wait: |
| r = self.wait_load_job(load_label, database_name, cluster_name=cluster_name) |
| return r |
| else: |
| return True |
| elif status == 'Fail': |
| return False |
| else: |
| return False |
| except Exception as e: |
| LOG.info(L("BULK LOAD failed.", err=str(e), msg=msg)) |
| return False |
| |
| def batch_load(self, load_label, load_data_list, max_filter_ratio=None, timeout=None, |
| hadoop_info=None, by_load_cluster=None, property_list=None, |
| database_name=None, is_wait=False, cluster_name=None, broker=None, |
| strict_mode=None, timezone=None, temp_partition=None): |
| """ |
| Load data |
| Attributes: |
| load_data_list: LoadDataInfo对象或list |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| data_list = list() |
| if isinstance(load_data_list, LoadDataInfo): |
| data_list.append(str(load_data_list)) |
| elif isinstance(load_data_list, list): |
| for data in load_data_list: |
| data_list.append(str(data)) |
| else: |
| raise PaloClientException('Load data list should be list or LoadDataInfo', |
| load_data_list=load_data_list) |
| sql = 'LOAD LABEL %s.%s (%s)' % (database_name, load_label, |
| ', '.join(data_list)) |
| if by_load_cluster is not None: |
| sql = '%s BY "%s"' % (sql, by_load_cluster) |
| if broker is not None: |
| sql = '%s %s' % (sql, str(broker)) |
| sql = '%s %s' % (sql, 'PROPERTIES(') |
| if max_filter_ratio is not None: |
| sql = '%s "max_filter_ratio"="%s",' % (sql, max_filter_ratio) |
| if timeout is not None: |
| sql = '%s "timeout"="%d",' % (sql, timeout) |
| if strict_mode is not None: |
| sql = '%s "strict_mode"="%s",' % (sql, strict_mode) |
| if timezone is not None: |
| sql = '%s "timezone"="%s",' % (sql, timezone) |
| |
| if hadoop_info is not None: |
| sql = '%s %s' % (sql, str(hadoop_info)) |
| |
| if property_list: |
| sql = sql + ', '.join(property_list) + ',' |
| |
| if sql.endswith(','): |
| sql = sql.rstrip(',') |
| sql = '%s %s' % (sql, ')') |
| else: |
| sql = sql.rstrip('PROPERTIES(') |
| |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L('LOAD fail.', database_name=database_name, load_label=load_label)) |
| return False |
| if is_wait: |
| ret = self.wait_load_job(load_label, database_name, cluster_name=cluster_name) |
| if not ret: |
| LOG.info(L('LOAD fail.', database_name=database_name, load_label=load_label)) |
| return False |
| LOG.info(L('LOAD succ.', database_name=database_name, load_label=load_label)) |
| return True |
| |
| def cancel_load(self, load_label, database_name=None): |
| """ |
| 取消导入任务 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'CANCEL LOAD FROM %s WHERE LABEL = "%s"' % (database_name, load_label) |
| ret = self.execute(sql) |
| if ret == (): |
| LOG.info(L("CANCEL LOAD succ.", database_name=database_name, label=load_label)) |
| return True |
| else: |
| LOG.info(L("CANCEL LOAD fail.", database_name=database_name, label=load_label)) |
| return False |
| |
| def get_quota(self, database_name, cluster_name=None): |
| """ |
| get quota |
| """ |
| if cluster_name: |
| database_name = '%s:%s' % (cluster_name, database_name) |
| sql = "SHOW PROC '/dbs'" |
| result = self.execute(sql) |
| for info in result: |
| if info[1] == database_name: |
| return info[3] |
| LOG.warning(L("Get quota fail.", database_name=database_name)) |
| return None |
| |
| def alter_database(self, database_name, quota): |
| """ |
| alter database |
| """ |
| sql = "ALTER DATABASE %s SET DATA QUOTA %d" % (database_name, quota) |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("ALTER DATABASE fail.", database_name=database_name, quota=quota)) |
| return False |
| return True |
| |
| def drop_database(self, database_name=None, force=None): |
| """ |
| 删除数据库 |
| Parameters: |
| database_name:如果为None,将删除默认的database. Type: str. |
| Returns: |
| False:数据库删除失败 |
| True:数据库删除成功 |
| Raises: |
| PaloClientException: 数据库删除异常 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if self.database_name == database_name: |
| self.database_name = None |
| sql = "DROP DATABASE %s" % database_name |
| if force is True: |
| sql = "DROP DATABASE %s FORCE" % database_name |
| self.execute(sql) |
| database_name_list = self.get_database_list() |
| if database_name in database_name_list: |
| LOG.warning(L("DROP DATABASE fail.", database_name=database_name)) |
| return False |
| else: |
| LOG.info(L("DROP DATABASE succ.", database_name=database_name)) |
| return True |
| |
| def clean(self, database_name=None): |
| """ |
| 清除所有数据 |
| Parameters: |
| database_name:默认删除所有database。Type: str. |
| Returns: |
| None |
| """ |
| if database_name is None: |
| database_name_list = self.get_database_list() |
| for database_name in database_name_list: |
| if database_name.find("information_schema") == -1: |
| self.clean(database_name) |
| else: |
| database_name_list = self.get_database_list() |
| while database_name in database_name_list: |
| try: |
| self.drop_database(database_name, force=True) |
| except PaloClientException: |
| pass |
| database_name_list = self.get_database_list() |
| |
| def drop_table(self, table_name, database_name=None, cluster_name=None, if_exist=False): |
| """删除table family |
| Parameters: |
| database_name:如果为None,将删除默认的database. [Type str] |
| table_family_name: table family name. [Type str] |
| Returns: |
| 目前无返回值 |
| Raises: |
| 没有捕获,可能抛出 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if not if_exist: |
| sql = 'DROP TABLE' |
| else: |
| sql = 'DROP TABLE IF EXISTS' |
| sql = "%s %s.%s" % (sql, database_name, table_name) |
| self.execute(sql) |
| |
| return True |
| |
| def drop_rollup_table(self, table_name, rollup_table_name, database_name=None): |
| """删除rollup table |
| Parameters: |
| database_name:如果为None,将删除默认的database. [Type str] |
| table_family_name: table family name. [Type str] |
| Returns: |
| 目前无返回值 |
| Raises: |
| 没有捕获,可能抛出 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = "ALTER TABLE %s.%s DROP ROLLUP %s" % \ |
| (database_name, table_name, rollup_table_name) |
| ret = self.execute(sql) |
| return ret == () |
| |
| def select_all(self, table_name, database_name=None): |
| """ |
| select all |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = "SELECT * FROM %s.%s" % (database_name, table_name) |
| result = self.execute(sql) |
| return result |
| |
| def query(self, sql): |
| """ |
| query |
| """ |
| result = self.execute(sql) |
| return result |
| |
| def get_load_job_state(self, label, database_name=None, cluster_name=None): |
| """ |
| get load job state |
| """ |
| load_job_list = self.get_load_job_list(database_name=database_name, \ |
| cluster_name=cluster_name) |
| for load_job in load_job_list: |
| job = palo_job.LoadJob(load_job) |
| if job.get_label() == label: |
| return job.get_state() |
| return None |
| |
| def get_unfinish_load_job_list(self, database_name=None, cluster_name=None): |
| """ |
| 获取所有未完成的导入任务,即:状态为pending、etl、loading的任务 |
| """ |
| load_job_list = self.get_load_job_list( \ |
| database_name=database_name, cluster_name=cluster_name) |
| result = list() |
| for load_job in load_job_list: |
| job = palo_job.LoadJob(load_job) |
| if job.get_state() != "FINISHED" and job.get_state() != "CANCELLED": |
| result.append(load_job) |
| |
| return result |
| |
| def get_load_job_list(self, state=None, database_name=None, cluster_name=None): |
| """ |
| 获取指定状态的导入任务信息, according job state get load job |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'SHOW LOAD FROM %s' % database_name |
| job_list = self.__execute_and_rebuild_meta_class(sql, palo_job.LoadJob) |
| result = list() |
| if state: |
| state = state.upper() |
| for job in job_list: |
| if palo_job.LoadJob(job).get_state() == state: |
| result.append(job) |
| return result |
| else: |
| return job_list |
| |
| def get_load_job(self, label, database_name=None, cluster_name=None): |
| """ |
| get load job, according label get load job |
| """ |
| load_job_list = self.get_load_job_list(database_name=database_name, \ |
| cluster_name=cluster_name) |
| for load_job in load_job_list: |
| if palo_job.LoadJob(load_job).get_label() == label: |
| return load_job |
| return None |
| |
| def get_delete_job_list(self, state=None, database_name=None, cluster_name=None): |
| """ |
| 获取指定状态的delete job信息 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if not cluster_name: |
| cluster_name = self.cluster_name |
| if cluster_name: |
| database_name = '%s:%s' % (cluster_name, database_name) |
| sql = 'SHOW DELETE FROM %s' % database_name |
| job_list = self.execute(sql) |
| result = list() |
| if state: |
| state = state.upper() |
| for job in job_list: |
| if palo_job.DeleteJob(job).get_state() == state: |
| result.append(job) |
| return result |
| else: |
| return job_list |
| |
| def wait_table_rollup_job(self, table_name, database_name=None, cluster_name=None, |
| state='FINISHED', timeout=2000): |
| """ |
| 等待rollup完成 |
| """ |
| time.sleep(5) |
| database_name = self.database_name if database_name is None else database_name |
| retry_times = 10 |
| while timeout > 0: |
| job_list = self.get_table_rollup_job_list(table_name, |
| database_name, cluster_name=cluster_name) |
| if not job_list: |
| retry_times -= 1 |
| if retry_times == 0: |
| LOG.info(L("CANNOT GET ROLLUP JOB.", database_name=database_name)) |
| break |
| time.sleep(1) |
| continue |
| last_job_state = palo_job.RollupJob(job_list[-1]).get_state() |
| if last_job_state == state: |
| LOG.info(L("GET ROLLUP JOB STATE.", database_name=database_name, |
| state=palo_job.RollupJob(job_list[-1]).get_state())) |
| return True |
| if last_job_state == 'CANCELLED': |
| LOG.info(L("ROLLUP JOB CANCELLED.", database_name=database_name, |
| msg=palo_job.RollupJob(job_list[-1]).get_msg())) |
| return False |
| if state != 'FINISHED' and last_job_state == 'FINISHED': |
| LOG.info(L("ROLLUP JOB FINISHED.", database_name=database_name)) |
| return False |
| time.sleep(3) |
| timeout -= 3 |
| LOG.warning(L("WAIT ROLLUP JOB TIMEOUT.", database_name=database_name)) |
| return False |
| |
| def wait_table_load_job(self, database_name=None, timeout=1800): |
| """等待db的所有load任务完成""" |
| database_name = self.database_name if database_name is None else database_name |
| flag = False |
| while timeout > 0: |
| flag = True |
| load_job_list = self.execute('SHOW LOAD FROM %s' % database_name) |
| for job in load_job_list: |
| LOG.info(L("LOAD JOB STATE.", database_name=database_name, |
| state=job[palo_job.LoadJob.State])) |
| if job[palo_job.LoadJob.State] != 'CANCELLED' and \ |
| job[palo_job.LoadJob.State] != 'FINISHED': |
| LOG.info(L("LOAD JOB RUNNING.", database_name=database_name, |
| state=job[palo_job.LoadJob.State])) |
| flag = False |
| break |
| time.sleep(3) |
| timeout -= 3 |
| if flag: |
| LOG.info(L("LOAD JOB FINISHED.", database_name=database_name)) |
| break |
| LOG.info(L("WAIT LOAD JOB FINISHED.", database_name=database_name)) |
| |
| def get_table_rollup_job_list(self, table_name, database_name=None, cluster_name=None): |
| """ |
| 获取指定table family的rollup任务信息 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'SHOW ALTER TABLE ROLLUP FROM %s WHERE TableName = "%s"' % (database_name, table_name) |
| database_rollup_job_list = self.__execute_and_rebuild_meta_class(sql, palo_job.RollupJob) |
| table_rollup_job_list = [] |
| for rollup_job in database_rollup_job_list: |
| if palo_job.RollupJob(rollup_job).get_table_name() == table_name: |
| table_rollup_job_list.append(rollup_job) |
| return table_rollup_job_list |
| |
| def get_database_rollup_job_list(self, database_name=None, cluster_name=None): |
| """ |
| 获取database的所有rollup job信息 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if cluster_name: |
| database = '%s:%s' % (cluster_name, database_name) |
| |
| sql = "SHOW PROC '/jobs/%s/rollup'" % self.get_database_id(database_name) |
| return self.execute(sql) |
| |
| def get_database_list(self, cluster_name=None): |
| """ |
| 显示所有的database name list |
| Parameters: |
| None |
| Returns: |
| database name list |
| Raises: |
| PaloClientException:获取数据异常 |
| """ |
| sql = r"SHOW DATABASES" |
| result = self.execute(sql) |
| database_name_list = [name[0] for name in result] |
| return database_name_list |
| |
| def get_partition_list(self, table_name, database_name=None, cluster_name=None): |
| """ |
| get table families |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if cluster_name: |
| database_name = '%s:%s' % (cluster_name, database_name) |
| sql = "SHOW PARTITIONS FROM %s.%s" % (database_name, table_name) |
| result = self.__execute_and_rebuild_meta_class(sql, palo_job.PartitionInfo) |
| return result |
| |
| def get_partition(self, table_name, partition_name, database_name=None, cluster_name=None): |
| """ |
| 获取指定的table family |
| """ |
| partition_list = self.get_partition_list(table_name, database_name, \ |
| cluster_name=cluster_name) |
| for partition in partition_list: |
| if partition[palo_job.PartitionInfo.PartitionName] == partition_name: |
| return partition |
| return None |
| |
| def get_partition_id(self, table_name, partition_name, database_name=None): |
| """ |
| get table family id. |
| Parameters: |
| database_name |
| table_family_name |
| Returns: |
| None:如果table family不存在 |
| """ |
| partition = self.get_partition(table_name, \ |
| partition_name, database_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.PartitionId] |
| else: |
| return None |
| |
| def get_partition_name_by_id(self, table_name, partition_id, \ |
| database_name=None, cluster_name=None): |
| """get partition name by id |
| """ |
| partition_list = self.get_partition_list(table_name, \ |
| database_name, cluster_name=cluster_name) |
| for partition in partition_list: |
| if partition[palo_job.PartitionInfo.PartitionId] == partition_id: |
| return partition[palo_job.PartitionInfo.PartitionName] |
| return None |
| |
| def get_partition_version(self, table_name, \ |
| partition_name, database_name=None, cluster_name=None): |
| """ |
| 获取table family的version号 |
| """ |
| partition = self.get_partition(table_name, \ |
| partition_name, database_name, cluster_name=cluster_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.VisibleVersion] |
| else: |
| return None |
| |
| def get_partition_storage_medium(self, table_name, partition_name, \ |
| database_name=None, cluster_name=None): |
| """ |
| get table family id. |
| Parameters: |
| database_name |
| table_family_name |
| Returns: |
| None:如果table family不存在 |
| """ |
| partition = self.get_partition(table_name, \ |
| partition_name, database_name, cluster_name=cluster_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.StorageMedium] |
| else: |
| return None |
| |
| def get_partition_cooldown_time(self, table_name, partition_name, \ |
| database_name=None, cluster_name=None): |
| """ |
| get table family id. |
| Parameters: |
| database_name |
| table_family_name |
| Returns: |
| None:如果table family不存在 |
| """ |
| partition = self.get_partition(table_name, |
| partition_name, database_name, cluster_name=cluster_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.CooldownTime] |
| else: |
| return None |
| |
| def get_partition_replication_num(self, table_name, partition_name, |
| database_name=None, cluster_name=None): |
| """ |
| get table family id. |
| Parameters: |
| database_name |
| table_family_name |
| Returns: |
| None:如果table family不存在 |
| """ |
| partition = self.get_partition(table_name, |
| partition_name, database_name, cluster_name=cluster_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.ReplicationNum] |
| else: |
| return None |
| |
| def get_partition_buckets(self, table_name, partition_name, |
| database_name=None, cluster_name=None): |
| """ |
| get table family id. |
| Parameters: |
| database_name |
| table_family_name |
| Returns: |
| None:如果table family不存在 |
| """ |
| partition = self.get_partition(table_name, partition_name, |
| database_name, cluster_name=cluster_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.Buckets] |
| else: |
| return None |
| |
| def delete(self, table_name, delete_conditions, |
| partition_name=None, database_name=None, is_wait=False): |
| """ |
| 按条件删除指定 table family 中的数据 |
| Parameters: |
| delete_conditions:可以是删除条件组成的list,每个删除条件是一个三元组,如下。也可以是字符串 |
| 由列名,比较运算符和值三项组成。如: |
| [("k1", "=", "0"), ("k2", "<=", "10")] |
| Returns: |
| True:执行成功 |
| False:执行失败 |
| Raises: |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if not partition_name: |
| sql = "DELETE FROM %s.%s WHERE" % (database_name, table_name) |
| else: |
| sql = "DELETE FROM %s.%s PARTITION %s WHERE" % (database_name, |
| table_name, partition_name) |
| if isinstance(delete_conditions, list): |
| for where_condition in delete_conditions: |
| # 对value项加上引号, 防止datetime因空格分隔造成错误 |
| where_condition = list(where_condition) |
| where_condition[2] = "\"%s\"" % where_condition[2] |
| sql = "%s %s AND" % (sql, " ".join(where_condition)) |
| sql = sql[:-4] |
| elif isinstance(delete_conditions, str): |
| sql = "%s %s" % (sql, delete_conditions) |
| # 同步的,返回时,已删除,未返回时,其他连接show delete状态为DELETING状态 |
| time.sleep(1) |
| ret = self.execute(sql) |
| LOG.info(L("DELETE EXECUTE SUCC", ret=ret)) |
| if is_wait is True: |
| if database_name is not None: |
| line = 'SHOW DELETE FROM %s' % database_name |
| else: |
| line = 'SHOW DELETE' |
| delete_job = self.execute(line) |
| for job in delete_job: |
| if job[4] == 'FINISHED': |
| pass |
| else: |
| time.sleep(1) |
| return ret == () |
| |
| def show_delete(self, database_name=None): |
| """show delete """ |
| if not database_name: |
| database_name = self.database_name |
| sql = 'SHOW DELETE FROM %s' % database_name |
| ret = self.execute(sql) |
| return ret |
| |
| def schema_change(self, table_family_name, add_column_list=None, |
| drop_column_list=None, modify_column_list=None, order_column_list=None, |
| bloom_filter_column_list=None, database_name=None, |
| colocate_with_list=None, distribution_type=None, |
| is_wait=False, cluster_name=None, comment=None, replication_allocation=None): |
| """schema change |
| Parameters: |
| table_family_name: |
| rollup_table_name: |
| add_column_list: 新加列,每个元素为一个新加列 |
| "列名 类型 聚合方法 default '默认值' after '列名' in 'rollup表名'" |
| e.g. ["addc1 int default '1'", \ |
| "addc2 float sum default '2.3'", \ |
| "addc3 int default '2' after k1", \ |
| "addc1 int default '1' in 'rollup_table'"] |
| drop_column_list: 删除列,每个元素为一个删除的列名 |
| e.g. ["k1", "v"] |
| modify_column_list: 修改列属性,每个元素为一个列, 参考add_column_list |
| order_column_list: 调整列顺序, 每个元素为一个删除的列名 |
| bloom_filter_column_list: |
| database_name: |
| colocate_with_list: |
| distribution_type: |
| is_wait: |
| cluster_name: |
| comment:表注释 |
| replication_allocation:副本标签 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = "ALTER TABLE %s.%s" % (database_name, table_family_name) |
| if add_column_list: |
| if len(add_column_list) == 1: |
| sql = "%s ADD COLUMN %s" % (sql, ", ".join(add_column_list)) |
| else: |
| sql = "%s ADD COLUMN (%s)" % (sql, ", ".join(add_column_list)) |
| if drop_column_list: |
| sql = "%s DROP COLUMN %s" % (sql, ", DROP COLUMN ".join(drop_column_list)) |
| if order_column_list: |
| if len(order_column_list) == 1: |
| sql = "%s ORDER BY %s" % (sql, ", ".join(order_column_list)) |
| else: |
| sql = "%s ORDER BY (%s)" % (sql, ", ".join(order_column_list)) |
| if modify_column_list: |
| sql = "%s MODIFY COLUMN %s" % (sql, ", MODIFY COLUMN ".join(modify_column_list)) |
| if bloom_filter_column_list: |
| if add_column_list is None and drop_column_list is None \ |
| and modify_column_list is None and order_column_list is None: |
| sql = '%s SET ("bloom_filter_columns"="%s")' % (sql, ",".join(bloom_filter_column_list)) |
| else: |
| sql = '%s PROPERTIES ("bloom_filter_columns"="%s")' % (sql, ",".join(bloom_filter_column_list)) |
| if colocate_with_list: |
| sql = '%s SET ("colocate_with"="%s")' % (sql, ",".join(colocate_with_list)) |
| if distribution_type: |
| sql = '%s SET ("distribution_type"="%s")' % (sql, ",".join(distribution_type)) |
| if comment: |
| sql = '%s MODIFY comment "%s"' % (sql, comment) |
| if replication_allocation: |
| sql = '%s SET ("replication_allocation"="%s")' % (sql, replication_allocation) |
| result = self.execute(sql) |
| if result != (): |
| LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name, |
| table_family_name=table_family_name)) |
| return False |
| if is_wait: |
| LOG.info(L("wait for SCHEMA CHANGE.", database_name=database_name, |
| table_family_name=table_family_name)) |
| if not self.wait_table_schema_change_job(table_family_name, cluster_name=cluster_name): |
| LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name, |
| table_family_name=table_family_name)) |
| return False |
| return True |
| |
| def schema_change_add_column(self, table_name, column_list, after_column_name=None, |
| to_table_name=None, force_alter=False, database_name=None, |
| is_wait_job=False, is_wait_delete_old_schema=False, |
| cluster_name=None, set_null=False): |
| """ |
| 增加列 |
| column_list: column_name, column_type, aggtype, default_value |
| same to function create table family |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s ADD COLUMN' % (database_name, table_name) |
| if len(column_list) > 1: |
| sql = '%s(' % sql |
| for column in column_list: |
| sql = '%s %s, ' % (sql, util.column_to_sql(column, set_null)) |
| sql = sql.rstrip(', ') |
| if len(column_list) > 1: |
| sql = '%s)' % sql |
| |
| if after_column_name: |
| if after_column_name == 'FIRST': |
| sql = '%s %s' % (sql, after_column_name) |
| else: |
| sql = '%s AFTER %s' % (sql, after_column_name) |
| if to_table_name: |
| sql = '%s TO %s' % (sql, to_table_name) |
| |
| if force_alter: |
| sql = '%s PROPERTIES("force_alter"="true")' % sql |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| if is_wait_job: |
| if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name): |
| return False |
| |
| return True |
| |
| def schema_change_drop_column(self, table_name, column_name_list, |
| from_table_name=None, force_alter=False, database_name=None, |
| is_wait_job=False, is_wait_delete_old_schema=False, cluster_name=None): |
| """ |
| 删除列 |
| column_name_list: ['k1', 'v1', 'v3'] |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s' % (database_name, table_name) |
| |
| from_table_sql = '' |
| if from_table_name: |
| from_table_sql = 'FROM %s' % (from_table_name) |
| |
| force_alter_sql = '' |
| if force_alter: |
| force_alter_sql = 'PROPERTIES("force_alter"="true")' |
| |
| for column in column_name_list: |
| sql = '%s DROP COLUMN %s %s %s, ' % (sql, column, from_table_sql, force_alter_sql) |
| |
| sql = sql.rstrip(', ') |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| if is_wait_job: |
| if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name): |
| return False |
| |
| return True |
| |
| def schema_change_order_column(self, table_name, column_name_list, |
| from_table_name=None, force_alter=False, database_name=None, |
| is_wait_job=False, is_wait_delete_old_schema=False, cluster_name=None): |
| """ |
| 重新排序 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s' % (database_name, table_name) |
| |
| from_table_sql = '' |
| if from_table_name: |
| from_table_sql = 'FROM %s' % (from_table_name) |
| |
| force_alter_sql = '' |
| if force_alter: |
| force_alter_sql = 'PROPERTIES("force_alter"="true")' |
| |
| sql = '%s ORDER BY (%s)' % (sql, ', '.join(column_name_list)) |
| |
| sql = '%s %s %s' % (sql, from_table_sql, force_alter_sql) |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| if is_wait_job: |
| if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name): |
| return False |
| |
| return True |
| |
| def schema_change_modify_column(self, table_name, column_name, column_type, |
| after_column_name=None, from_table_name=None, force_alter=False, |
| database_name=None, is_wait_job=False, |
| is_wait_delete_old_schema=False, cluster_name=None, aggtype='', column_info=""): |
| """ |
| 修改列类型 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| schema = self.desc_table(table_name, database_name=database_name) |
| if not aggtype: |
| for field in schema: |
| if field[0] == column_name and field[3] == 'false': |
| aggtype = field[5].split(',')[0] |
| break |
| if aggtype == '-' or aggtype == 'NONE': |
| aggtype = '' |
| |
| sql = 'ALTER TABLE %s.%s MODIFY COLUMN %s %s %s %s' % (database_name, |
| table_name, column_name, column_type, aggtype, |
| column_info) |
| |
| if after_column_name: |
| sql = '%s AFTER %s' % (sql, after_column_name) |
| |
| if from_table_name: |
| sql = '%s FROM %s' % (sql, from_table_name) |
| |
| if force_alter: |
| sql = '%s PROPERTIES("force_alter"="true")' % sql |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| if is_wait_job: |
| if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name): |
| return False |
| |
| return True |
| |
| def wait_table_schema_change_job(self, table_name, database_name=None, cluster_name=None, |
| state="FINISHED", timeout=1200): |
| """ |
| 等待schema change完成 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| try_times = 0 |
| while try_times < 120 and timeout > 0: |
| time.sleep(3) |
| timeout -= 3 |
| schema_change_job_list = self.get_table_schema_change_job_list( |
| table_name, database_name, cluster_name=cluster_name) |
| if not schema_change_job_list or len(schema_change_job_list) == 0: |
| try_times += 1 |
| continue |
| last_job_state = palo_job.SchemaChangeJob(schema_change_job_list[-1]).get_state() |
| LOG.info(L("GET LAST SCHEMA CHANGE JOB STATE", state=last_job_state)) |
| if last_job_state == state: |
| LOG.info(L("GET SCHEMA CHANGE JOB STATE.", database_name=database_name, |
| state=palo_job.SchemaChangeJob(schema_change_job_list[-1]).get_state())) |
| return True |
| if last_job_state == 'CANCELLED' and state != 'CANCELLED': |
| LOG.info(L("SCHEMA CHANGE fail.", state='CANCELLED', |
| msg=palo_job.SchemaChangeJob(schema_change_job_list[-1]).get_msg())) |
| return False |
| if state != 'FINISHED' and last_job_state == 'FINISHED': |
| LOG.info(L("SCHEMA CHANGE FINISHED.", state='FINISHED')) |
| return False |
| LOG.warning(L("WAIT SCHEMA CHANGE TIMEOUT.", database_name=database_name)) |
| return False |
| |
| def get_table_schema_change_job_list(self, table_name, database_name=None, cluster_name=None): |
| """ |
| 获取指定table的所有schema change job信息 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'SHOW ALTER TABLE COLUMN FROM %s' % database_name |
| database_schema_change_job_list = self.__execute_and_rebuild_meta_class(sql, palo_job.SchemaChangeJob) |
| table_schema_change_job_list = [] |
| for schema_change_job in database_schema_change_job_list: |
| if palo_job.SchemaChangeJob(schema_change_job).get_table_name() == table_name: |
| table_schema_change_job_list.append(schema_change_job) |
| return table_schema_change_job_list |
| |
| def cancel_schema_change(self, table_name, database_name=None): |
| """ |
| 取消rollup |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'CANCEL ALTER TABLE COLUMN FROM %s.%s' % (database_name, table_name) |
| ret = self.execute(sql) |
| if ret == (): |
| LOG.info(L("CANCEL SCHEMA CHANGE Succ.", database_name=database_name, |
| table_name=table_name)) |
| return True |
| else: |
| LOG.info(L("CANCEL SCHEMA CHANGE Fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| def add_partition(self, table_name, partition_name, value, |
| distribute_type=None, bucket_num=None, |
| storage_medium=None, storage_cooldown_time=None, |
| database_name=None, partition_type=None): |
| """ |
| 增加分区 |
| 增加分区的时候,只支持默认的表的分桶方式,不支持其他新的分桶方式,可修改bucket数量 |
| Args: |
| table_name: table name |
| partition_name: new partition name |
| value: str or tuple, like: 'k1' or ('k1', 'k2'), (('1','2'),('3','4')) |
| distribute_type: only hash, do not support random |
| bucket_num: num |
| storage_medium: |
| storage_cooldown_time: |
| database_name: |
| |
| Returns: |
| True/except |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| predicate = 'IN' if partition_type is not None and partition_type.upper() == 'LIST' else 'LESS THAN' |
| if value != 'MAXVALUE': |
| # 单列list or 多列range |
| if isinstance(value, tuple) and isinstance(value[0], str): |
| value = '(%s)' % ','.join('"{0}"'.format(v) for v in value) |
| value = value.replace('"MAXVALUE"', 'MAXVALUE') |
| # 多列list |
| elif isinstance(value, tuple) and isinstance(value[0], tuple): |
| in_val_list = [] |
| for multi_col_value in value: |
| in_val_list.append('(%s)' % ','.join('"{0}"'.format(v) for v in multi_col_value)) |
| value = '(%s)' % ','.join(in_val_list) |
| # 单列range |
| else: |
| value = '("%s")' % (value) |
| sql = 'ALTER TABLE %s.%s ADD PARTITION %s VALUES %s %s' % ( |
| database_name, table_name, partition_name, predicate, value) |
| sql = '%s (' % (sql) |
| if storage_medium is not None: |
| sql = '%s "storage_medium"="%s",' % (sql, storage_medium) |
| if storage_cooldown_time is not None: |
| sql = '%s "storage_cooldown_time"="%s",' % (sql, storage_cooldown_time) |
| if sql.endswith(' ('): |
| sql = sql.rstrip(' (') |
| else: |
| sql = '%s )' % (sql.rstrip(',')) |
| if distribute_type: |
| if distribute_type.upper() == 'RANDOM': |
| discol = self.execute('desc %s.%s' % (database_name, table_name))[0][0] |
| sql = '%s DISTRIBUTED BY HASH(%s)' % (sql, discol) |
| else: |
| sql = '%s DISTRIBUTED BY %s' % (sql, distribute_type) |
| if bucket_num: |
| sql = '%s BUCKETS %d' % (sql, bucket_num) |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("ADD PARTITION fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| return True |
| |
| def modify_partition(self, table_name, partition_name=None, |
| storage_medium=None, storage_cooldown_time=None, replication_num=None, |
| database_name=None, **kwargs): |
| """ |
| 修改分区的 storage_medium、storage_cooldown_time 和 replication_num 三个属性。 |
| 对于单分区表,partition_name 同表名 |
| 如果partition_name为none则为修改全表属性 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s' % (database_name, table_name) |
| if partition_name is not None: |
| if isinstance(partition_name, list): |
| partition_name = '(' + ','.join(partition_name) + ')' |
| sql = '%s MODIFY PARTITION %s' % (sql, partition_name) |
| if storage_medium is not None: |
| kwargs['storage_medium'] = storage_medium |
| if storage_cooldown_time is not None: |
| kwargs['storage_cooldown_time'] = storage_cooldown_time |
| if replication_num is not None: |
| kwargs['replication_num'] = replication_num |
| property = '' |
| for k, v in kwargs.items(): |
| property += ' "%s" = "%s",' % (k, v) |
| property_str = property.strip(',') |
| sql = '%s SET(%s)' % (sql, property_str) |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("MODIFY PARTITION fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| return True |
| |
| def drop_partition(self, table_name, partition_name, database_name=None): |
| """ |
| 删除分区 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s DROP PARTITION %s' % ( \ |
| database_name, table_name, partition_name) |
| |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L('', fe=str(self), error=e)) |
| return False |
| |
| return True |
| |
| def add_temp_partition(self, table_name, partition_name, value, distribute_type=None, bucket_num=None, |
| in_memory=None, replication_num=None, database_name=None, partition_type='RANGE'): |
| """ |
| 新建临时分区 |
| :param table_name: |
| :param partition_name: |
| :param value: |
| :param distribute_type: |
| :param bucket_num: |
| :param in_memory: |
| :param replication_num: |
| :param database_name: |
| :return: |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| partitioninfo = PartitionInfo(partition_type=partition_type).get_partition_value(value) |
| print(partitioninfo) |
| sql = 'ALTER TABLE %s.%s ADD TEMPORARY PARTITION %s %s' % (database_name, table_name, |
| partition_name, partitioninfo) |
| sql = '%s (' % (sql) |
| if in_memory is not None: |
| sql = '%s "in_memory"="%s",' % (sql, in_memory) |
| if replication_num is not None: |
| sql = '%s "replication_num"="%s",' % (sql, replication_num) |
| if sql.endswith(' ('): |
| sql = sql.rstrip(' (') |
| else: |
| sql = '%s )' % (sql.rstrip(',')) |
| if distribute_type: |
| if distribute_type.upper() == 'RANDOM': |
| discol = self.execute('desc %s.%s' % (database_name, table_name))[0][0] |
| sql = '%s DISTRIBUTED BY HASH(%s)' % (sql, discol) |
| else: |
| sql = '%s DISTRIBUTED BY %s' % (sql, distribute_type) |
| if bucket_num: |
| sql = '%s BUCKETS %d' % (sql, bucket_num) |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("ADD TEMP PARTITION fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| return True |
| |
| def drop_temp_partition(self, database_name, table_name, partition_name): |
| """ |
| 删除临时分区 |
| :param database_name: |
| :param table_name: |
| :param partition_name: |
| :return: |
| """ |
| self.use(database_name) |
| sql = 'ALTER TABLE %s DROP TEMPORARY PARTITION %s' % (table_name, partition_name) |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("DROP TEMP PARTITION fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| return True |
| |
| def modify_temp_partition(self, database_name, table_name, target_partition_list, temp_partition_list, |
| strict_range=None, use_temp_partition_name=None): |
| """ |
| 修改临时分区 |
| :param database_name: |
| :param table_name: |
| :param target_partition_list: |
| :param temp_partition_list: |
| :param strict_range: |
| :param use_temp_partition_name: |
| :return: |
| """ |
| self.use(database_name) |
| target_partition = ','.join(target_partition_list) |
| temp_partition = ','.join(temp_partition_list) |
| sql = 'ALTER TABLE %s.%s REPLACE PARTITION (%s) WITH TEMPORARY PARTITION (%s)' % \ |
| (database_name, table_name, target_partition, temp_partition) |
| |
| if strict_range is not None or use_temp_partition_name is not None: |
| sql = "%s PROPERTIES (" % sql |
| value_list = [] |
| print(strict_range) |
| if strict_range: |
| value_list.append('"strict_range"="%s"' % strict_range) |
| if use_temp_partition_name: |
| value_list.append('"use_temp_partition_name"="%s"' % use_temp_partition_name) |
| print(value_list) |
| sql = "%s %s)" % (sql, ','.join(value_list)) |
| |
| result = self.execute(sql) |
| |
| if result != (): |
| LOG.info(L("ALTER TEMP PARTITION fail.", database_name=database_name, |
| table_name=table_name)) |
| return False |
| |
| return True |
| |
| def create_user(self, user, password=None, is_superuser=False, default_role=None): |
| """ |
| create user |
| """ |
| sql = "CREATE USER '%s'" % user |
| if password is not None: |
| sql = "%s IDENTIFIED BY '%s'" % (sql, password) |
| if is_superuser: |
| sql = "%s SUPERUSER" % sql |
| if default_role: |
| sql = "%s DEFAULT ROLE '%s'" % (sql, default_role) |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("CREATE USER fail.", user=user, password=password, \ |
| is_superuser=is_superuser)) |
| return False |
| return True |
| |
| def create_role(self, role_name): |
| """ |
| create role |
| """ |
| sql = 'CREATE ROLE %s' % role_name |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("CREATE ROLE fail.", sql=sql, msg=str(result))) |
| return False |
| return True |
| |
| def drop_role(self, role): |
| """drop role""" |
| sql = "DROP ROLE %s" % role |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("DROP ROLE fail", msg=str(result), role=role)) |
| return False |
| return True |
| |
| def drop_user(self, user, if_exists=True): |
| """ |
| drop user |
| """ |
| if if_exists: |
| sql = "DROP USER IF EXISTS '%s'" % user |
| else: |
| sql = "DROP USER '%s'" % user |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("DROP USER fail.", user=user)) |
| return False |
| return True |
| |
| def clean_user(self, user): |
| """clean user |
| """ |
| try: |
| self.drop_user(user) |
| except: |
| pass |
| |
| def set_password(self, user=None, password=None): |
| """ |
| set password of user to password |
| """ |
| if user is None: |
| user = self.user |
| if password is None: |
| password = '' |
| sql = "SET PASSWORD FOR '%s' = PASSWORD('%s')" % (user, password) |
| |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("SET PASSWORD fail.", user=user, password=password)) |
| return False |
| return True |
| |
| def alter_user(self, user, cpu_share=None, max_user_connections=None): |
| """ |
| alter user |
| """ |
| sql = "ALTER USER '%s'" % user |
| if cpu_share is not None: |
| sql = "%s MODIFY RESOURCE CPU_SHARE %d," % (sql, cpu_share) |
| if max_user_connections is not None: |
| sql = "%s MODIFY PROPERTY MAX_USER_CONNECTIONS %d," % (sql, max_user_connections) |
| sql = sql.rstrip(",") |
| result = self.execute(sql) |
| if result != (): |
| LOG.error(L("ALTER USER fail.", user=user, cpu_share=cpu_share, |
| max_user_connections=max_user_connections)) |
| return False |
| return True |
| |
| def get_cpu_share(self, user): |
| """ |
| get cpu share |
| """ |
| result = self.show_resource(user) |
| for info in result: |
| if info[0] == user and info[1] == "CPU_SHARE": |
| return int(info[2]) |
| return None |
| |
| def show_resource(self, user): |
| """ |
| show resource |
| """ |
| sql = "SHOW RESOURCE LIKE '%s'" % user |
| result = self.execute(sql) |
| if result == (): |
| LOG.warning(L("SHOW RESOURCE fail.", user=user)) |
| return result |
| |
| def grant(self, user, privilege_list, database='', table='', catalog='', |
| resource=None, is_role=False, identity='%'): |
| """ |
| grant |
| GRANT privilege_list ON grant_obj TO [user_identity] | [ROLE role_name] |
| GRANT privilege_list ON RESOURCE grant_obj TO [user_identity] | [ROLE role_name] |
| """ |
| sql = "GRANT {privilege_list} ON {grant_obj} TO {role_desc}{user}" |
| if isinstance(privilege_list, list): |
| privilege_list = ', '.join(privilege_list) |
| if is_role: |
| role_desc = "ROLE " |
| user = '"%s"' % user |
| else: |
| role_desc = '' |
| user = '"%s"@"%s"' % (user, identity) |
| if resource: |
| grant_obj = 'RESOURCE "%s"' % resource |
| else: |
| grant_obj = "%s.%s.%s" % (catalog, database, table) |
| grant_obj = grant_obj.strip(".") |
| result = self.execute(sql.format(privilege_list=privilege_list, grant_obj=grant_obj, |
| role_desc=role_desc, user=user)) |
| if result != (): |
| LOG.error(L("GRANT fail", user=user, privilege=privilege_list, |
| database_name=database)) |
| return False |
| return True |
| |
| def revoke(self, user, privilege_list, database='', table='', catalog='', |
| resource=None, is_role=False): |
| """ |
| revoke |
| REVOKE privilege_list ON db_name[.tbl_name] FROM [user_identity] | [ROLE role_name] |
| REVOKE privilege_list ON RESOURCE resource_name FROM [user_identity] | [ROLE role_name] |
| """ |
| if isinstance(privilege_list, list): |
| privilege_list = ', '.join(privilege_list) |
| sql = "REVOKE {privilege_list} ON {revoke_obj} FROM {role_desc}{user}" |
| if is_role: |
| role_desc = 'ROLE ' |
| user = '"%s"' % user |
| else: |
| role_desc = '' |
| if resource is None: |
| revoke_obj = "%s.%s.%s" % (catalog, database, table) |
| revoke_obj = revoke_obj.strip('.') |
| else: |
| revoke_obj = 'RESOURCE %s' % resource |
| result = self.execute(sql.format(privilege_list=privilege_list, revoke_obj=revoke_obj, |
| role_desc=role_desc, user=user)) |
| if result != (): |
| LOG.error(L("REVOKE fail", sql=sql, msg=str(result))) |
| return False |
| return True |
| |
| def get_grant(self, user=None, all=None): |
| """show grant""" |
| if user is None: |
| sql = 'SHOW GRANTS' |
| else: |
| sql = 'SHOW GRANTS FOR `%s`' % user |
| if all: |
| sql = 'SHOW ALL GRANTS' |
| result = self.__execute_and_rebuild_meta_class(sql, palo_job.GrantInfo) |
| return result |
| |
| def is_master(self): |
| """ |
| is master |
| deprecated |
| """ |
| sql = "SHOW FRONTENDS" |
| fe_list = self.execute(sql) |
| |
| for fe in fe_list: |
| if fe[palo_job.FrontendInfo.Host] == socket.gethostbyname(self.host): |
| return fe[palo_job.FrontendInfo.IsMaster] == "true" |
| return False |
| |
| def get_alive_backend_list(self): |
| """ |
| 获取alive backend |
| """ |
| backend_list = self.get_backend_list() |
| result = list() |
| for backend in backend_list: |
| if palo_job.BackendProcInfo(backend).get_alive() == "true": |
| result.append(backend) |
| LOG.info(L("GET ALIVE BACKEND", alive_be=result)) |
| return result |
| |
| def get_backend(self, backend_id): |
| """ |
| 获取backend |
| """ |
| backend_list = self.get_backend_list() |
| for backend in backend_list: |
| if palo_job.BackendProcInfo(backend).get_backend_id() == str(backend_id): |
| return backend |
| LOG.warning(L("Get no backend by backend id", backend_id=backend_id)) |
| return None |
| |
| def get_backend_heartbeat_port(self, value=None, idx=None): |
| """get be hearbeat port""" |
| be_list = self.get_backend_list() |
| if value is None: |
| port = palo_job.BackendProcInfo(be_list[0]).get_heartbeatport() |
| else: |
| port = util.get_attr_condition_value(be_list, idx, value, |
| palo_job.BackendProcInfo.HeartbeatPort) |
| return port |
| |
| def get_backend_list(self): |
| """ |
| 获取backend |
| """ |
| sql = "SHOW BACKENDS" |
| result = self.__execute_and_rebuild_meta_class(sql, palo_job.BackendProcInfo) |
| return result |
| |
| def get_backend_id_list(self): |
| """get backend id list""" |
| backend_list = self.get_backend_list() |
| id_list = list() |
| for be in backend_list: |
| id_list.append(be[0]) |
| return id_list |
| |
| def get_be_hostname_by_id(self, be_id): |
| """ |
| 获取backend hostname by id |
| """ |
| be = self.get_backend(be_id) |
| if not be: |
| return None |
| return palo_job.BackendProcInfo(be).get_hostname() |
| |
| def get_backend_host_list(self): |
| """ |
| 返回活动状态的backend的ip |
| """ |
| backend_list = self.get_alive_backend_list() |
| be_host_list = [palo_job.BackendProcInfo(backend).get_ip() for backend in backend_list] |
| return tuple(be_host_list) |
| |
| def get_backend_host_ip(self): |
| """get all be host ip""" |
| res = [] |
| backend_list = self.get_backend_list() |
| for backend in backend_list: |
| cur_ip = palo_job.BackendProcInfo(backend).get_ip() |
| res.append(cur_ip) |
| return res |
| |
| def get_backend_host_name(self): |
| """get all be host name""" |
| backend_list = self.get_backend_list() |
| return util.get_attr(backend_list, palo_job.BackendProcInfo.Host) |
| |
| def get_backend_host_port_list(self): |
| """ |
| 返回活动状态的backend的host:port |
| metadata changed |
| backends[3] is be hostname, backends[4] is be heartbeat port |
| """ |
| backend_list = self.get_alive_backend_list() |
| backend_host_port_list = [] |
| for backend in backend_list: |
| backend_info = palo_job.BackendProcInfo(backend) |
| backend_host_port = '%s:%s' % (backend_info.get_hostname(), |
| backend_info.get_heartbeatport()) |
| backend_host_port_list.append(backend_host_port) |
| return tuple(backend_host_port_list) |
| |
| def add_backend_list(self, backend_list): |
| """ |
| 增加backend |
| disable |
| """ |
| if not isinstance(backend_list, list): |
| backend_list = [backend_list, ] |
| for backend in backend_list: |
| sql = 'ALTER SYSTEM ADD BACKEND "%s"' % (backend) |
| result = self.execute(sql) |
| if result != (): |
| LOG.info(L('ADD BACKEND FAIL', backend=backend)) |
| return False |
| return True |
| |
| def add_backend(self, host_name, port, tag_location=None): |
| """ |
| 增加backend |
| """ |
| sql = 'ALTER SYSTEM ADD BACKEND "%s:%s"' % (host_name, port) |
| if tag_location is not None: |
| sql = '%s PROPERTIES("tag.location"="%s")' % (sql, tag_location) |
| result = self.execute(sql) |
| time.sleep(2) |
| if result != (): |
| LOG.info(L('ADD BACKEND FAIL', backend=host_name, port=port, tag_location=tag_location)) |
| return False |
| LOG.info(L('ADD BACKEND SUCCESS', backend=host_name, port=port, tag_location=tag_location)) |
| return True |
| |
| def drop_backend_list(self, backend_list): |
| """ |
| 移除backend |
| """ |
| if not isinstance(backend_list, list): |
| backend_list = [backend_list, ] |
| for backend in backend_list: |
| sql = 'ALTER SYSTEM DROPP BACKEND "%s"' % (backend) |
| result = self.execute(sql) |
| if result != (): |
| LOG.info(L('DROP BACKEND FAIL', backend=backend)) |
| return False |
| time.sleep(2) |
| return True |
| |
| def decommission_backend_list(self, backend_list): |
| """be下线""" |
| if not isinstance(backend_list, list): |
| backend_list = [backend_list, ] |
| for backend in backend_list: |
| sql = 'ALTER SYSTEM DECOMMISSION BACKEND "%s"' % (backend) |
| result = self.execute(sql) |
| if result != (): |
| LOG.info(L('DECOMMISSION BACKEND FAIL', backend=backend)) |
| return False |
| return True |
| |
| def add_fe_list(self, fe_list, type='OBSERVER'): |
| """ |
| 增加FE |
| type: OBSERVER, REPLICA |
| """ |
| if not isinstance(fe_list, list): |
| fe_list = [fe_list, ] |
| for fe in fe_list: |
| sql = 'ALTER SYSTEM ADD %s "%s"' % (type, fe) |
| result = self.execute(sql) |
| if result != (): |
| LOG.info(L('ADD FE FAILED', fe=fe)) |
| return False |
| return True |
| |
| def drop_fe_list(self, fe_list, type='OBSERVER'): |
| """ |
| 增加FE |
| type: OBSERVER, REPLICA |
| """ |
| if not isinstance(fe_list, list): |
| fe_list = [fe_list, ] |
| for fe in fe_list: |
| sql = 'ALTER SYSTEM DROP %s "%s"' % (type, fe) |
| result = self.execute(sql) |
| if result != (): |
| LOG.info(L('DROP FE FAILED', fe=fe)) |
| return False |
| return True |
| |
| def get_fe_list(self): |
| """ |
| 获取FE list |
| """ |
| sql = "SHOW FRONTENDS" |
| result = self.execute(sql) |
| return result |
| |
| def get_fe_host_port_list(self, type=None): |
| """ |
| 返回fe的host:port |
| type: OBSERVER, REPLICA |
| """ |
| fe_list = self.get_fe_list() |
| fe_host_port_list = [] |
| for fe in fe_list: |
| if type is not None and fe[2] != type: |
| continue |
| fe_host_port = '%s:%s' % (fe[0], fe[1]) |
| fe_host_port_list.append(fe_host_port) |
| return tuple(fe_host_port_list) |
| |
| def get_master(self): |
| """ |
| 返回master的host:port |
| """ |
| fe_list = self.get_fe_list() |
| for fe in fe_list: |
| fe_info = palo_job.FrontendInfo(fe) |
| if fe_info.get_ismaster() == "true": |
| fe_host_port = "%s:%s" % (fe_info.get_host(), fe_info.get_httpport()) |
| return fe_host_port |
| return None |
| |
| def get_master_host(self): |
| """ |
| 返回master的host:port |
| """ |
| fe_list = self.get_fe_list() |
| for fe in fe_list: |
| fe_info = palo_job.FrontendInfo(fe) |
| if fe_info.get_ismaster() == "true": |
| return fe_info.get_host() |
| return None |
| |
| def get_fe_LastHeartbeat(self, fe_ip): |
| """ |
| 返回fe_LastHeartbeat |
| """ |
| fe_list = self.get_fe_list() |
| for fe in fe_list: |
| if palo_job.FrontendInfo(fe).get_IP() == str(fe_ip): |
| return fe[palo_job.FrontendInfo(fe).get_LastHeartbeat()] |
| LOG.warning(L("Get no fe by fe id", fe_id=fe_ip)) |
| return None |
| |
| def get_fe_host(self): |
| """get fe host list""" |
| fe_list = self.get_fe_list() |
| fe_host = list() |
| for fe in fe_list: |
| fe_host.append(fe[1]) |
| return fe_host |
| |
| def recover_database(self, database_name): |
| """ |
| 恢复database |
| """ |
| sql = "RECOVER DATABASE %s" % database_name |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("recover database error", fe=str(self), database_name=database_name, \ |
| error=e)) |
| return False |
| return True |
| |
| def recover_table(self, table_name, database_name=None): |
| """ |
| 恢复database |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = "RECOVER TABLE " |
| if database_name is not None: |
| sql = '%s%s.' % (sql, database_name) |
| sql = '%s%s' % (sql, table_name) |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("recover table error", fe=str(self), database_name=database_name, \ |
| table_name=table_name, error=e)) |
| return False |
| return True |
| |
| def recover_partition(self, table_name, partition_name, database_name=None): |
| """ |
| 恢复database |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = "RECOVER PARTITION %s FROM " % (partition_name) |
| if database_name is not None: |
| sql = '%s%s.' % (sql, database_name) |
| sql = '%s%s' % (sql, table_name) |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("recover partition error", fe=str(self), database_name=database_name, \ |
| table_name=table_name, partition_name=partition_name, error=e)) |
| return False |
| return True |
| |
| def rename_database(self, new_database_name, old_database_name=None): |
| """ |
| 重命名数据库 |
| """ |
| if old_database_name is None: |
| old_database_name = self.database_name |
| sql = 'ALTER DATABASE %s RENAME %s' % (old_database_name, new_database_name) |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("rename database error", fe=str(self), |
| new_database_name=new_database_name, error=e)) |
| return False |
| return True |
| |
| def rename_table(self, new_table_name, old_table_name, database_name=None): |
| """ |
| 重命名数据库 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s RENAME %s' % (database_name, old_table_name, new_table_name) |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("rename table error", fe=str(self), error=e)) |
| return False |
| return True |
| |
| def rename_rollup(self, new_index_name, old_index_name, table_name, database_name=None): |
| """ |
| 重命名数据库 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s RENAME ROLLUP %s %s' % (database_name, table_name, |
| old_index_name, new_index_name) |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("rename rollup error", fe=str(self), error=e)) |
| return False |
| return True |
| |
| def rename_partition(self, new_partition_name, old_partition_name, |
| table_name, database_name=None): |
| """ |
| 重命名数据库 |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'ALTER TABLE %s.%s RENAME PARTITION %s %s' % (database_name, table_name, |
| old_partition_name, new_partition_name) |
| try: |
| self.execute(sql) |
| except PaloException as e: |
| LOG.error(L("rename partition error", fe=str(self), error=e)) |
| return False |
| return True |
| |
| def show_databases(self, database_name=None): |
| """show databases |
| """ |
| sql = 'SHOW DATABASES' |
| if database_name: |
| sql = 'SHOW DATABASES LIKE "%s"' % database_name |
| return self.execute(sql) |
| |
| def show_tables(self, table_name=None): |
| """show tables |
| """ |
| sql = 'SHOW TABLES' |
| if table_name: |
| sql = 'SHOW TABLES LIKE "%s"' % table_name |
| return self.execute(sql) |
| |
| def show_partitions(self, table_name, database_name=None): |
| """show partitions |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'SHOW PARTITIONS FROM %s.%s' % (database_name, table_name) |
| result = self.__execute_and_rebuild_meta_class(sql, palo_job.PartitionInfo) |
| return self.execute(sql) |
| |
| def show_loading_job_state(self, database_name=None, state='LOADING'): |
| """show loading state job""" |
| ret = self.show_load(database_name=database_name, state=state) |
| return ret |
| |
| def show_load(self, database_name=None, label=None, state=None, order_by=None, limit=None, |
| offset=None): |
| """ |
| SHOW LOAD |
| [FROM db_name] |
| [ |
| WHERE |
| [LABEL [ = "your_label" | LIKE "label_matcher"]] |
| [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]] |
| ] |
| [ORDER BY ...] |
| [LIMIT limit][OFFSET offset]; |
| |
| Returns: |
| load_job |
| |
| """ |
| sql = 'SHOW LOAD' |
| if database_name: |
| sql = '%s FROM %s' % (sql, database_name) |
| if label: |
| sql = '%s WHERE label = "%s"' % (sql, label) |
| if state: |
| sql = '%s WHERE STATE="%s"' % (sql, state) |
| if order_by: |
| sql = '%s ORDER BY %s' % (sql, order_by) |
| if limit: |
| sql = '%s LIMIT %s' % (sql, limit) |
| if offset: |
| sql = '%s OFFSET %s' % (sql, offset) |
| return self.execute(sql) |
| |
| def desc_table(self, table_name, database_name=None, is_all=False): |
| """desc table""" |
| database_name = self.database_name if database_name is None else database_name |
| if is_all: |
| sql = 'DESC %s.%s all' % (database_name, table_name) |
| else: |
| sql = 'DESC %s.%s' % (database_name, table_name) |
| return self.execute(sql) |
| |
| def show_schema_change_job(self, database_name=None, table_name=None, state=None): |
| """show schema change job""" |
| database_name = self.database_name if database_name is None else database_name |
| where_list = [] |
| if table_name: |
| where_list.append('TableName = "%s"' % table_name) |
| if state: |
| where_list.append('State= "%s"' % state) |
| |
| if len(where_list) == 0: |
| sql = 'SHOW ALTER TABLE COLUMN FROM %s' % database_name |
| return self.execute(sql) |
| else: |
| sql = 'SHOW ALTER TABLE COLUMN FROM %s WHERE %s' % (database_name, ' AND '.join(where_list)) |
| return self.execute(sql) |
| |
| def show_rollup_job(self, database_name=None, table_name=None): |
| """show schema change job""" |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'SHOW ALTER TABLE ROLLUP FROM %s' % database_name |
| if table_name is None: |
| return self.execute(sql) |
| else: |
| sql = '%s WHERE TableName = "%s"' % (sql, table_name) |
| sc_job = self.execute(sql) |
| return sc_job |
| |
| def set_properties(self, kv_list, user=None): |
| """set properties |
| """ |
| for_user = '' |
| if user is not None: |
| for_user = 'FOR "%s" ' % user |
| properties = ', '.join(kv_list) if not isinstance(kv_list, str) else kv_list |
| sql = 'SET PROPERTY %s%s' % (for_user, properties) |
| result = self.execute(sql) |
| if result != (): |
| return False |
| return True |
| |
| def set_max_user_connections(self, max_connections, user=None): |
| """set max user connections |
| """ |
| kv = '"max_user_connections" = "%s"' % max_connections |
| return self.set_properties(kv, user) |
| |
| def set_resource_cpu_share(self, cpu_share, user=None): |
| """set resource cpu share |
| """ |
| kv = '"resource.cpu_share" = "%s"' % cpu_share |
| return self.set_properties(kv, user) |
| |
| def set_quota_low(self, quota_low, user=None): |
| """set quota low |
| """ |
| kv = '"quota.low" = "%s"' % quota_low |
| return self.set_properties(kv, user) |
| |
| def set_quota_normal(self, quota_normal, user=None): |
| """set quota normal |
| """ |
| kv = '"quota.normal" = "%s"' % quota_normal |
| return self.set_properties(kv, user) |
| |
| def set_quota_high(self, quota_high, user=None): |
| """set quota high |
| """ |
| kv = '"quota.high" = "%s"' % quota_high |
| return self.set_properties(kv, user) |
| |
| def set_load_cluster_hadoop_palo_path(self, cluster_name, hadoop_palo_path, user=None): |
| """set load cluster hadoop palo path |
| """ |
| kv = '"load_cluster.%s.hadoop_palo_path" = "%s"' % (cluster_name, hadoop_palo_path) |
| return self.set_properties(kv, user) |
| |
| def set_load_cluster_hadoop_http_port(self, cluster_name, hadoop_http_port, user=None): |
| """set load cluster hadoop http port |
| """ |
| kv = '"load_cluster.%s.hadoop_http_port" = "%s"' % (cluster_name, hadoop_http_port) |
| return self.set_properties(kv, user) |
| |
| def set_load_cluster_hadoop_configs(self, cluster_name, hadoop_configs, user=None): |
| """set load cluster hadoop configs |
| """ |
| kv = '"load_cluster.%s.hadoop_configs" = "%s"' % (cluster_name, hadoop_configs) |
| return self.set_properties(kv, user) |
| |
| def set_load_cluster(self, cluster_name, hadoop_configs, hadoop_palo_path, |
| hadoop_http_port=None, user=None): |
| """set load cluster |
| """ |
| kv_1 = '"load_cluster.%s.hadoop_configs" = "%s"' % (cluster_name, hadoop_configs) |
| kv_2 = '"load_cluster.%s.hadoop_palo_path" = "%s"' % (cluster_name, hadoop_palo_path) |
| kv_list = [kv_1, kv_2] |
| if hadoop_http_port is not None: |
| kv_3 = '"load_cluster.%s.hadoop_http_port" = "%s"' % (cluster_name, hadoop_http_port) |
| kv_list.append(kv_3) |
| return self.set_properties(kv_list, user) |
| |
| def set_default_load_cluster(self, cluster_name, user=None): |
| """set default load cluster |
| """ |
| kv = '"default_load_cluster" = "%s"' % cluster_name |
| return self.set_properties(kv, user) |
| |
| def remove_default_load_cluster(self, user=None): |
| """remove default load cluster |
| """ |
| kv = '"default_load_cluster" = ""' |
| return self.set_properties(kv, user) |
| |
| def remove_load_cluster(self, cluster_name, user=None): |
| """remove load cluster hadoop configs |
| """ |
| kv = '"load_cluster.%s" = ""' % cluster_name |
| return self.set_properties(kv, user) |
| |
| def remove_load_cluster_hadoop_configs(self, cluster_name, user=None): |
| """remove load cluster hadoop configs |
| """ |
| kv = '"load_cluster.%s.hadoop_configs" = ""' % cluster_name |
| return self.set_properties(kv, user) |
| |
| def remove_load_cluster_hadoop_http_port(self, cluster_name, user=None): |
| """remove load cluster hadoop http port |
| """ |
| kv = '"load_cluster.%s.hadoop_http_port" = ""' % cluster_name |
| return self.set_properties(kv, user) |
| |
| def remove_load_cluster_hadoop_palo_path(self, cluster_name, user=None): |
| """remove load cluster hadoop palo path |
| """ |
| kv = '"load_cluster.%s.hadoop_palo_path" = ""' % cluster_name |
| return self.set_properties(kv, user) |
| |
| def gen_hadoop_configs(self, fs_default_name=None, mapred_job_tracker=None, |
| hadoop_job_ugi=None, mapred_job_priority=None): |
| """gen hadoop configs |
| """ |
| configs = '' |
| if fs_default_name is not None: |
| configs += 'fs.default.name=%s;' % (fs_default_name) |
| if mapred_job_tracker is not None: |
| configs += 'mapred.job.tracker=%s;' % (mapred_job_tracker) |
| if hadoop_job_ugi is not None: |
| configs += 'hadoop.job.ugi=%s;' % (hadoop_job_ugi) |
| if mapred_job_priority is not None: |
| configs += 'mapred.job.priority=%s;' % (mapred_job_priority) |
| configs = configs.rstrip(';') |
| return configs |
| |
| def show_property(self, key=None, user=None): |
| """show property |
| """ |
| for_user = '' if user is None else ' FOR "%s"' % user |
| property_key = '' if key is None else ' LIKE "%s" ' % key |
| sql = 'SHOW PROPERTY%s%s' % (for_user, property_key) |
| return self.execute(sql) |
| |
| def show_max_user_connections(self, user=None): |
| """show max user connections |
| ((u'max_user_connections', u'10'),) |
| """ |
| result = self.show_property('max_user_connections', user) |
| return int(result[0][1]) |
| |
| def show_resource_cpu_share(self, user=None): |
| """show resource cpu share |
| ((u'resource.cpu_share', u'1000'), |
| (u'resource.hdd_read_iops', u'80'), |
| (u'resource.hdd_read_mbps', u'30'), |
| (u'resource.io_share', u'1000'), |
| (u'resource.ssd_read_iops', u'1000'), |
| (u'resource.ssd_read_mbps', u'30')) |
| """ |
| result = self.show_property('resource', user) |
| for item in result: |
| if item[0] == 'resource.cpu_share': |
| return int(item[1]) |
| |
| def show_quota_low(self, user=None): |
| """show quota low |
| """ |
| result = self.show_property('quota', user) |
| for item in result: |
| if item[0] == 'quota.low': |
| return int(item[1]) |
| |
| def show_quota_normal(self, user=None): |
| """show quota normal |
| """ |
| result = self.show_property('quota', user) |
| for item in result: |
| if item[0] == 'quota.normal': |
| return int(item[1]) |
| |
| def show_quota_high(self, user=None): |
| """show quota high |
| """ |
| result = self.show_property('quota', user) |
| for item in result: |
| if item[0] == 'quota.high': |
| return int(item[1]) |
| |
| def show_load_cluster(self, user=None): |
| """show load cluster |
| """ |
| result = self.show_property('%load_cluster%', user) |
| load_cluster_dict = {} |
| for k, v in result: |
| load_cluster_dict[k] = v |
| return load_cluster_dict |
| |
| def add_whitelist(self, user, whitelist): |
| """ |
| add whitelist |
| """ |
| sql = 'ALTER USER %s ADD WHITELIST "%s"' % (user, whitelist) |
| return self.execute(sql) == () |
| |
| def delete_whitelist(self, user, whitelist): |
| """ |
| delete whitelist |
| """ |
| sql = 'ALTER USER %s DELETE WHITELIST "%s"' % (user, whitelist) |
| return self.execute(sql) == () |
| |
| def show_whitelist(self, user): |
| """ |
| show whitelist |
| """ |
| sql = 'SHOW WHITELIST' |
| result = self.execute(sql) |
| for r in result: |
| if r[0] == user: |
| return r[1] |
| return None |
| |
| def clean_whitelist(self, user): |
| """ |
| clean whitelist |
| """ |
| whitelist = self.show_whitelist(user) |
| if whitelist is None: |
| return |
| for w in whitelist.split(','): |
| if w: |
| self.delete_whitelist(user, w) |
| |
| def create_cluster(self, cluster_name, instance_num, password=''): |
| """ |
| create cluster |
| """ |
| sql = 'CREATE CLUSTER %s PROPERTIES ( "instance_num" = "%d") ' \ |
| 'IDENTIFIED BY "%s"' % (cluster_name, instance_num, password) |
| self.execute(sql) |
| |
| def enter(self, cluster_name): |
| """ |
| create cluster |
| """ |
| sql = 'ENTER %s' % (cluster_name) |
| self.execute(sql) |
| self.cluster_name = cluster_name |
| |
| def clean_cluster(self, cluster_name=None): |
| """ |
| clean cluster |
| """ |
| if not cluster_name: |
| cluster_name_list = self.get_cluster_list() |
| for cluster_name in cluster_name_list: |
| if cluster_name == 'default_cluster': |
| continue |
| self.enter(cluster_name) |
| self.clean() |
| self.drop_cluster(cluster_name) |
| else: |
| try: |
| self.enter(cluster_name) |
| self.clean() |
| self.drop_cluster(cluster_name) |
| except: |
| pass |
| |
| def drop_cluster(self, cluster_name): |
| """ |
| drop cluster |
| """ |
| sql = 'DROP CLUSTER %s' % (cluster_name) |
| self.execute(sql) |
| |
| def get_cluster_list(self): |
| """ |
| show cluster |
| """ |
| sql = 'SHOW CLUSTERS' |
| records = self.execute(sql) |
| return tuple([r[0] for r in records]) |
| |
| def alter_cluster(self, cluster_name, instance_num): |
| """ |
| alter cluster |
| """ |
| sql = 'ALTER CLUSTER %s PROPERTIES ( "instance_num" = "%d" )' \ |
| % (cluster_name, instance_num) |
| self.execute(sql) |
| |
| def link(self, src_cluster_name, src_database_name, dst_cluster_name, dst_database_name): |
| """ |
| link |
| """ |
| sql = 'LINK DATABASE %s.%s %s.%s' % (src_cluster_name, |
| src_database_name, dst_cluster_name, dst_database_name) |
| self.execute(sql) |
| |
| def migrate(self, src_cluster_name, src_database_name, dst_cluster_name, dst_database_name): |
| """ |
| migrate |
| """ |
| sql = 'MIGRATE DATABASE %s.%s %s.%s' % (src_cluster_name, |
| src_database_name, dst_cluster_name, dst_database_name) |
| self.execute(sql) |
| |
| def get_migrate_status(self, src_cluster_name, src_database_name, |
| dst_cluster_name, dst_database_name): |
| """ |
| 获取指定状态的导入任务信息 |
| """ |
| sql = 'SHOW MIGRATIONS' |
| migrate_list = self.execute(sql) |
| print(migrate_list) |
| for it in migrate_list: |
| if it[1] == '%s:%s' % (src_cluster_name, src_database_name) \ |
| and it[2] == '%s:%s' % (dst_cluster_name, dst_database_name): |
| return it[3] |
| return None |
| |
| def wait_migrate(self, src_cluster_name, src_database_name, |
| dst_cluster_name, dst_database_name): |
| """ |
| wait migrate |
| """ |
| time.sleep(5) |
| ret = self.get_migrate_status(src_cluster_name, src_database_name, |
| dst_cluster_name, dst_database_name) |
| if not ret: |
| return False |
| while True: |
| ret = self.get_migrate_status(src_cluster_name, src_database_name, |
| dst_cluster_name, dst_database_name) |
| if ret == '100%': |
| return True |
| time.sleep(1) |
| |
| def add_broker(self, broker_name, host_port): |
| """ |
| add broker |
| """ |
| sql = 'ALTER SYSTEM ADD BROKER %s "%s"' % (broker_name, host_port) |
| return self.execute(sql) == () |
| |
| def drop_broker(self, broker_name, host_port): |
| """ |
| add broker |
| """ |
| sql = 'ALTER SYSTEM DROP BROKER %s "%s"' % (broker_name, host_port) |
| return self.execute(sql) == () |
| |
| def get_broker_list(self): |
| """ |
| show whitelist |
| """ |
| sql = 'SHOW PROC "/brokers"' |
| result = self.execute(sql) |
| return result |
| |
| def get_broker_start_update_time(self): |
| """show broker_start_update_time""" |
| res = self.get_broker_list() |
| if res: |
| LOG.info(L("get_broker_start_update_time", start_time=palo_job.BrokerInfo(res[0]).get_last_start_time(), |
| update_time=palo_job.BrokerInfo(res[0]).get_last_update_time())) |
| return palo_job.BrokerInfo(res[0]).get_last_start_time() |
| return None |
| |
| def drop_all_broker(self, broker_name): |
| """ |
| add broker |
| """ |
| sql = 'ALTER SYSTEM DROP ALL BROKER %s"' % broker_name |
| return self.execute(sql) == () |
| |
| def export(self, table_name, to_path, broker_info=None, |
| partition_name_list=None, property_dict=None, |
| database_name=None, where=None): |
| """export data. |
| |
| Args: |
| table_name: table name, str |
| to_path: export path, str |
| broker_info: broker info, BrokerInfo |
| partition_name_list: name list of patitions, str list |
| property_dict: properties, dict str->str |
| database_name: database name, str |
| where: str, k1>0 |
| Returns: |
| True if succeed |
| """ |
| sql = 'EXPORT TABLE' |
| if not database_name: |
| database_name = self.database_name |
| sql = '%s %s.%s' % (sql, database_name, table_name) |
| if partition_name_list: |
| sql = '%s PARTITION (%s)' % (sql, ','.join(partition_name_list)) |
| if where: |
| sql = '%s WHERE %s' % (sql, where) |
| sql = '%s TO "%s"' % (sql, to_path) |
| if property_dict: |
| p = '' |
| for d in property_dict: |
| p = '%s "%s"="%s",' % (p, d, property_dict[d]) |
| p = p.rstrip(",") |
| sql = '%s PROPERTIES(%s)' % (sql, p) |
| if broker_info: |
| sql = '%s %s' % (sql, str(broker_info)) |
| |
| return self.execute(sql) == () |
| |
| def get_export_status(self, database_name=None): |
| """get export status. |
| |
| Args: |
| database_name: database name, str |
| Returns: |
| True if succeed |
| """ |
| sql = 'SHOW EXPORT' |
| if database_name: |
| sql = '%s FROM %s' % (sql, database_name) |
| sql = '%s %s' % (sql, 'ORDER BY JOBID LIMIT 1') |
| return self.execute(sql) |
| |
| def show_export(self, database_name=None, state=None, export_job_id=None, |
| order_by=None, limit=None): |
| """ |
| Args: |
| database_name: database name |
| state: export status filter, PENDING|EXPORTIING|FINISHED|CANCELLED |
| export_job_id: export job id filter |
| order_by: column, str 'starttime' |
| limit: num, int |
| Returns: |
| export job |
| """ |
| sql = 'SHOW EXPORT' |
| if database_name: |
| sql = '%s FROM %s' % (sql, database_name) |
| if state: |
| sql = '%s WHERE STATE="%s"' % (sql, state) |
| if export_job_id: |
| sql = '%s AND ID="%s"' % (sql, export_job_id) |
| elif export_job_id: |
| sql = '%s WHERE ID="%s"' % (sql, export_job_id) |
| if order_by: |
| sql = '%s ORDER BY %s' % (sql, order_by) |
| if limit: |
| sql = '%s LIMIT %s' % (sql, limit) |
| return self.execute(sql) |
| |
| def wait_export(self, database_name=None): |
| """wait export. |
| |
| Args: |
| database_name: database name, str |
| Returns: |
| True if succeed |
| """ |
| timeout = 1200 |
| while timeout > 0: |
| r = self.get_export_status(database_name) |
| export_job = palo_job.ExportJob(r[0]) |
| if export_job.get_state() == 'FINISHED': |
| LOG.info(L("export succ.", state='FINISHED', database_name=database_name)) |
| return True |
| elif export_job.get_state() == 'CANCELLED': |
| LOG.warning(L("export fail.", state='CANCELLED', database_name=database_name, |
| msg=export_job.get_error_msg())) |
| return False |
| else: |
| time.sleep(1) |
| timeout -= 1 |
| LOG.warning(L("export timeout.", timeout=timeout, database_name=database_name)) |
| return False |
| |
| def create_external_table(self, table_name, column_list, engine, property, |
| broker_property=None, database_name=None, set_null=False): |
| """Create broker table. |
| |
| Args: |
| table_name: table name, str |
| column_list: list of columns |
| engine: str, it should be olap, mysql, elasticsearch, broker |
| property: map, |
| if broker engine e.g.: |
| PROPERTIES ( |
| "broker_name" = "broker_name", |
| "paths" = "file_path1[,file_path2]", |
| "column_separator" = "value_separator" |
| "line_delimiter" = "value_delimiter" |
| |"format" = "parquet" |
| ) |
| mysql engine e.g.: |
| PROPERTIES ( |
| "host" = "mysql_server_host", |
| "port" = "mysql_server_port", |
| "user" = "your_user_name", |
| "password" = "your_password", |
| "database" = "database_name", |
| "table" = "table_name" |
| ) |
| es engine e.g.: |
| PROPERTIES ( |
| "hosts" = "http://", |
| "user" = "root", |
| "password" = "", |
| "index" = "new_data", "type" = "mbtable" ) |
| broker_property: broker property, broker maybe hdfs, bos, afs. |
| set_null: if column can be null |
| Returns: |
| True if succeed |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| # table name |
| sql = 'CREATE EXTERNAL TABLE %s.%s (' % (database_name, table_name) |
| # columns |
| for column in column_list: |
| sql = '%s %s,' % (sql, util.column_to_no_agg_sql(column, set_null)) |
| sql = '%s ) ENGINE=%s' % (sql.rstrip(','), engine) |
| # property |
| sql = '%s PROPERTIES %s' % (sql, util.convert_dict2property(property)) |
| # broker_property |
| if isinstance(broker_property, BrokerInfo): |
| sql = '%s BROKER PROPERTIES %s' % (sql, broker_property.get_property()) |
| elif isinstance(broker_property, str): |
| sql = '%s BROKER PROPERTIES (%s)' % (sql, broker_property) |
| elif isinstance(broker_property, dict): |
| sql = '%s BROKER PROPERTIES %s' % (sql, util.convert_dict2property(broker_property)) |
| else: |
| pass |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L('CREATE EXTERNAL TABLE fail.', database_name=database_name, |
| table_name=table_name)) |
| return False |
| LOG.info(L('CREATE EXTERNAL TABLE succ.', database_name=database_name, |
| table_name=table_name)) |
| return True |
| |
| def stream_load(self, table_name, data_file, database_name=None, host=None, port=None, |
| user=None, password=None, cluster_name=None, is_wait=True, max_filter_ratio=None, |
| load_label=None, column_name_list=None, timeout=300, column_separator=None, |
| partition_list=None, where_filter=None, time_zone=None, **kwargs): |
| """ |
| Args: |
| table_name: 必填,表名 |
| data_file: 必填,上传数据 |
| user: 可选,palo的用户 |
| password: 可选, palo用户的密码 |
| database_name: 可选, 数据库名,默认为当前client的数据库 |
| cluster_name: 可选, cluster,默认为default_cluster.不建议使用 |
| host: 可选,默认为当前client连接的fe |
| port: 可选,默认为当前client的http_port |
| -H参数包括: |
| max_filter_ratio: 可选, 最大容忍可过滤的数据比例0-1 |
| load_label/label: 可选,导入的标签 |
| column_name_list/columns: 可选, 指定导入文件中的列和table中的列的对应关系 |
| timeout: 可选,连接的超时时间 |
| column_separator: 可选, 列的分割符 |
| partition_list/partitions: 可选, 指定本次导入的分区 |
| where_filter/where: 可选,用户抽取部分数据 |
| time_zone/timezone: 可选,用户设置时区 |
| strict_mode: 用户指定此次导入是否开启严格模式,默认为false |
| exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节 |
| format: 指定导入数据格式,默认是csv,支持json格式 |
| jsonpaths: 导入json方式分为:简单模式和精准模式 |
| strip_outer_array: 为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false |
| json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为"" |
| 控制参数: |
| is_wait: 当返回结果为publish timeout时,是否等待事务visible |
| |
| Returns: |
| 0 fail/label already exist |
| 1 success |
| 2 publish timeout |
| """ |
| database_name = database_name if database_name is not None else self.database_name |
| host = host if host is not None else self.host |
| port = port if port is not None else self.http_port |
| user = user if user is not None else self.user |
| password = password if password is not None else self.password |
| uri = "http://%s:%s/api/%s/%s/_stream_load" % (host, port, database_name, table_name) |
| |
| buf = BytesIO() |
| c = pycurl.Curl() |
| c.setopt(c.URL, uri) |
| c.setopt(c.WRITEFUNCTION, buf.write) |
| head = dict() |
| head['Content-length'] = os.path.getsize(data_file) |
| head['Transfer-Encoding'] = '' |
| head['Expect'] = '100-continue' |
| file = open(data_file, "rb") |
| if load_label: |
| head['label'] = load_label |
| if column_name_list: |
| head['columns'] = ','.join(column_name_list) |
| if column_separator: |
| head['column_separator'] = column_separator |
| if max_filter_ratio: |
| head['max_filter_ratio'] = max_filter_ratio |
| if partition_list: |
| head['partitions'] = ','.join(partition_list) |
| if where_filter: |
| head['where'] = where_filter |
| if time_zone: |
| head['timezone'] = time_zone |
| if timeout: |
| head['timeout'] = timeout |
| print(data_file) |
| head.update(kwargs) |
| print(head) |
| param = '' |
| for k, v in head.items(): |
| param = '%s -H "%s:%s"' % (param, k, v) |
| curl_cmd = 'curl --location-trusted -u %s:%s %s -T %s %s' % (user, password, param, data_file, uri) |
| LOG.info(L('STREAM LOAD CURL CMD.', cmd=curl_cmd)) |
| print(curl_cmd) |
| # 设置-H参数 |
| c.setopt(pycurl.HTTPHEADER, [k + ': ' + str(v) for k, v in head.items()]) |
| # basic认证 |
| c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) |
| c.setopt(pycurl.USERNAME, user) |
| c.setopt(pycurl.PASSWORD, password) |
| # 上传文件 |
| c.setopt(pycurl.PUT, 1) |
| c.setopt(pycurl.UPLOAD, 1) |
| c.setopt(pycurl.READDATA, file) |
| # 重定向,--location-trusted |
| c.setopt(pycurl.UNRESTRICTED_AUTH, 1) |
| c.setopt(pycurl.FOLLOWLOCATION, True) |
| # 连接超时 |
| c.setopt(pycurl.TIMEOUT, 30) |
| LOG.info(L('STREAM LOAD.', head=head, url=uri, data_file=data_file)) |
| try: |
| c.perform() |
| msg = buf.getvalue() |
| print(msg) |
| LOG.info(L('STREAM LOAD ret.', ret=msg)) |
| if c.getinfo(c.HTTP_CODE) == 200: |
| ret = json.loads(msg) |
| status = ret.get("Status") |
| txn_id = ret.get("TxnId") |
| if status == 'Success': |
| stream_load_ret = 1 |
| self.wait_txn(txn_id, database_name) |
| elif status == 'Publish Timeout': |
| stream_load_ret = 2 |
| if is_wait: |
| self.wait_txn(txn_id, database_name) |
| else: |
| # Label Already Exists/Fail |
| url = ret.get("ErrorURL") |
| if url: |
| sql = 'show load warnings on "%s"' % url |
| ret = self.execute(sql) |
| print(ret[0]) |
| stream_load_ret = 0 |
| else: |
| stream_load_ret = 0 |
| except Exception as e: |
| stream_load_ret = 0 |
| print(str(e)) |
| LOG.info(L('STREAM LOAD FAILED.', msg=str(e))) |
| finally: |
| buf.close() |
| file.close() |
| c.close() |
| return stream_load_ret |
| |
| def insert_select(self, table_name, select_query, column_name_list=None, is_streaming=True, database_name=None): |
| """insert""" |
| if database_name is not None: |
| sql = 'INSERT INTO %s.%s' % (database_name, table_name) |
| else: |
| sql = 'INSERT INTO %s' % table_name |
| if column_name_list: |
| sql = '%s (%s)' % (sql, ','.join(column_name_list)) |
| sql = '%s %s' % (sql, select_query) |
| ret = self.execute(sql) |
| return ret == () |
| |
| def get_column(self, column_name, table_name, database_name=None): |
| """get column""" |
| column_tuple = self.desc_table(table_name, database_name) |
| for column in column_tuple: |
| if column_name == palo_job.DescInfo(column).get_field(): |
| return column_name |
| return None |
| |
| def get_column_info(self, column_name, table_name, database_name=None): |
| """get column info""" |
| column_tuple = self.desc_table(table_name, database_name) |
| for column in column_tuple: |
| if column_name == palo_job.DescInfo(column).get_field(): |
| return column |
| return None |
| |
| def get_all_columns(self, table_name, database_name=None): |
| """get table columns""" |
| column_tuple = self.desc_table(table_name, database_name) |
| column_name_list = list() |
| for column in column_tuple: |
| column_name = palo_job.DescInfo(column).get_field() |
| column_name_list.append(column_name) |
| return column_name_list |
| |
| def create_repository(self, repo_name, broker_name, repo_location, repo_properties, |
| is_read_only=False): |
| """ |
| Args: |
| repo_name: string, repository name |
| broker_name: string, broker name |
| repo_location: string, repository location path |
| repo_properties: dict, property of repository location |
| "bos_endpoint" = "http://gz.bcebos.com", |
| "bos_accesskey" = "XXXXXXXXXXXXXXXXXXXX", |
| "bos_secret_accesskey"="XXXXXXXXXXXXXXXXX" |
| |
| properties = dict() |
| properties["bos_endpoint"] = "http://gz.bcebos.com" |
| properties["bos_accesskey"] = "XXXXXXXXXXXXXXXXXXX" |
| properties["bos_secret_accesskey"] = "XXXXXXXXXXXXXXXXXXX" |
| Returns: |
| True create repo success |
| False create repo failed |
| need to check |
| """ |
| sql = 'CREATE {readonly} REPOSITORY {repo_name} WITH BROKER {broker_name} \ |
| ON LOCATION "{repo_location}" PROPERTIES ({repo_properties})' |
| if isinstance(repo_properties, dict): |
| property = '' |
| for k, v in repo_properties.items(): |
| property += ' "%s" = "%s",' % (k, v) |
| p = property.strip(',') |
| elif isinstance(repo_properties, str): |
| p = repo_properties |
| else: |
| return False |
| if is_read_only: |
| readonly = 'READ ONLY' |
| else: |
| readonly = '' |
| s = sql.format(readonly=readonly, repo_name=repo_name, broker_name=broker_name, |
| repo_location=repo_location, repo_properties=p) |
| try: |
| ret = self.execute(s) |
| except Exception as e: |
| logging.exception(e) |
| LOG.info(L('CREATE REPO fail.', repository_name=repo_name, msg=str(e))) |
| return False |
| if ret == (): |
| LOG.info(L('CREATE REPO succ.', repository_name=repo_name)) |
| return True |
| else: |
| LOG.info(L('CREATE REPO fail.', repository_name=repo_name)) |
| return False |
| |
| def drop_repository(self, repo_name): |
| """ |
| |
| Args: |
| repo_name: string, repository name to be dropped |
| |
| Returns: |
| True drop repo success |
| False drop repo failure |
| |
| """ |
| sql = 'DROP REPOSITORY {repo_name}' |
| s = sql.format(repo_name=repo_name) |
| try: |
| ret = self.execute(s) |
| except Exception as e: |
| logging.exception(e) |
| LOG.info(L('DROP REPO fail.', repository_name=repo_name, msg=str(e))) |
| return False |
| if ret == (): |
| LOG.info(L('DROP REPO succ.', repository_name=repo_name)) |
| return True |
| else: |
| LOG.info(L('DROP REPO fail.', repository_name=repo_name)) |
| return False |
| |
| def show_repository(self): |
| """ |
| Returns: existed repo |
| |
| RepoId: 唯一的仓库ID |
| RepoName: 仓库名称 |
| CreateTime: 第一次创建该仓库的时间 |
| IsReadOnly: 是否为只读仓库 |
| Location: 仓库中用于备份数据的根目录 |
| Broker: 依赖的 Broker |
| ErrMsg: Palo 会定期检查仓库的连通性,如果出现问题,这里会显示错误信息 |
| """ |
| sql = 'SHOW REPOSITORIES' |
| ret = self.execute(sql) |
| return ret |
| |
| def get_repository(self, repo_name=None, repo_info=False): |
| """ |
| Args: |
| repo_name: string, repo name |
| |
| Returns: |
| if repo_name exists return |
| (RepoId, RepoName, CreateTime, IsReadOnly, Location, Broker, ErrMsg) |
| else return None |
| """ |
| r = self.show_repository() |
| if repo_name is None: |
| return util.get_attr(r, palo_job.RepoInfo.RepoName) |
| for repo in r: |
| if repo_name == repo[palo_job.RepoInfo.RepoName]: |
| if not repo_info: |
| return repo |
| else: |
| return palo_job.RepoInfo(repo) |
| return None |
| |
| def backup(self, snapshot_label, backup_list, repo_name, database_name=None, type=None, |
| timeout=1800, is_wait=False): |
| """ |
| |
| Args: |
| snapshot_label: string, snapshot name |
| backup_list: list, table/partition to backup, ['table', 'table2 PARTITION (p1, p2)'] |
| repo_name: string, repo name |
| database_name: string, db name, if none, use default db |
| type: string, only support full, full backup |
| timeout: int, timeout |
| is_wait: False/True, if wait backup job finish |
| |
| Returns: |
| True backup success |
| False backup failure |
| |
| """ |
| sql = 'BACKUP SNAPSHOT {db}.{snapshot_label} TO {repo} on ({backup}) {property}' |
| b = ','.join(backup_list) |
| property = 'PROPERTIES({property})' |
| database_name = self.database_name if database_name is None else database_name |
| if type is None and timeout is None: |
| property = '' |
| else: |
| p = list() |
| if type and type.upper() == 'FULL': |
| p.append('"type"="full"') |
| if timeout: |
| p.append('"timeout"="%s"' % timeout) |
| property = property.format(property='e,'.join(p)) |
| s = sql.format(db=database_name, snapshot_label=snapshot_label, repo=repo_name, backup=b, |
| property=property) |
| try: |
| ret = self.execute(s) |
| except Exception as e: |
| logging.exception(e) |
| LOG.info(L('BACKUP SNAPSHOT FAILED.', snapshot_label=snapshot_label, msg=str(e))) |
| msg = 'Can only run one backup or restore job of a database at same time' |
| if msg in str(e): |
| ret = self.show_backup(database_name) |
| LOG.info(L("Currrent backup jobs", ret=ret)) |
| ret = self.show_restore(database_name) |
| LOG.info(L('Currrent restore job', ret=ret)) |
| return False |
| if ret != (): |
| LOG.info(L('BACKUP SNAPSHOT FAILED.', snapshot_label=snapshot_label)) |
| return False |
| if is_wait is False: |
| LOG.info(L('BACKUP SNAPSHOT SUCCEEDED.', snapshot_label=snapshot_label)) |
| return True |
| else: |
| ret = self.wait_backup_job(snapshot_label) |
| return ret |
| |
| def wait_backup_job(self, label=None, database_name=None): |
| """ |
| Args: |
| label: if label is None, wait all backup job finish, else wait label job finish(backup job can not run at the same time in a db) |
| |
| Returns: |
| True if backup job is Finished state |
| False if backup job is Cancelled state |
| |
| """ |
| ret = self.show_backup(database_name) |
| if ret == (): |
| return True |
| flag = False |
| while label is None and flag is False: |
| flag = True |
| for backup_job in ret: |
| s = backup_job[palo_job.BackupJob.State] |
| if s != 'FINISHED' and s != 'CANCELLED': |
| flag = False |
| time.sleep(1) |
| ret = self.show_backup(database_name) |
| if flag is True: |
| LOG.info(L('BACKUP JOB ALL FINISHED, NO JOB RUNNING.')) |
| return True |
| timeout = 3600 |
| while timeout > 0: |
| for backup_job in ret: |
| if label == backup_job[palo_job.BackupJob.SnapshotName]: |
| s = backup_job[palo_job.BackupJob.State] |
| if s == 'FINISHED': |
| LOG.info(L('BACKUP SNAPSHOT FINISEHD.', snapshot_label=label)) |
| return True |
| elif s == 'CANCELLED': |
| LOG.info(L('BACKUP SNAPSHOT CANCELLED.', snapshot_label=label, |
| msg=backup_job[palo_job.BackupJob.Status])) |
| return False |
| time.sleep(3) |
| ret = self.show_backup(database_name) |
| timeout -= 1 |
| LOG.info(L('BACKUP JOB WAIT TIMEOUT.', snapshot_label=label)) |
| return False |
| |
| def show_backup(self, database_name=None): |
| """ |
| |
| Args: |
| database_name: string, database name |
| |
| Returns: |
| ((), (),...) |
| |
| """ |
| if database_name is None: |
| sql = 'SHOW BACKUP' |
| else: |
| sql = 'SHOW BACKUP FROM {db}'.format(db=database_name) |
| ret = self.__execute_and_rebuild_meta_class(sql, palo_job.BackupJob) |
| return ret |
| |
| def cancel_backup(self, database_name=None): |
| """ |
| |
| Args: |
| database_name: string, database name |
| |
| Returns: |
| True if cancel backup job success |
| False if cancel backup job failure |
| |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'CANCEL BACKUP FROM {db}'.format(db=database_name) |
| ret = self.execute(sql) |
| if ret == (): |
| LOG.info(L('CANCEL BACKUP SUCCESS.', database_name=database_name)) |
| return True |
| else: |
| LOG.info(L('CANCEL BACKUP FAILURE.', database_name=database_name)) |
| return False |
| |
| def restore(self, snapshot_name, repo_name, restore_list, database_name=None, |
| replication_num=None, timeout=1800, is_wait=False): |
| """ |
| |
| Args: |
| snapshot_name: string, snapshot name |
| repo_name: string, repo name |
| restore_list: list, table and partition and rename, ['table1', 'table2 PARTITION (p1, p2)', 'table3 PARTITION (p1, p2) AS table_rename'] |
| database_name: string, database name |
| replication_num: int, replication number |
| timeout: int, timeout |
| is_wait: True/False, if wait restore job finished |
| |
| Returns: |
| True if restore succeeded |
| False if restore failed |
| |
| """ |
| sql = 'RESTORE SNAPSHOT {db}.{snapshot_name} from {repo_name} on ({restore_list}) ' \ |
| 'PROPERTIES("backup_timestamp"="{timestamp}"{property})' |
| database_name = self.database_name if database_name is None else database_name |
| r = ','.join(restore_list) |
| p = '' |
| if replication_num: |
| p = '%s, "replication_num"="%d"' % (p, replication_num) |
| if timeout: |
| p = '%s, "timeout"="%d"' % (p, timeout) |
| backup_timestamp = self.__get_backup_timestamp(repo_name, snapshot_name) |
| if not backup_timestamp: |
| LOG.info(L('get timestamp error when restore.', snapshot=snapshot_name, repo=repo_name)) |
| return False |
| s = sql.format(db=database_name, snapshot_name=snapshot_name, repo_name=repo_name, |
| restore_list=r, timestamp=backup_timestamp, property=p) |
| try: |
| ret = self.execute(s) |
| except Exception as e: |
| logging.exception(e) |
| LOG.info(L('RESTORE SNAPSHOT FAILED.', snapshot_name=snapshot_name, msg=str(e))) |
| msg = 'Can only run one backup or restore job of a database at same time' |
| if msg in str(e): |
| ret = self.show_restore(database_name) |
| LOG.info(L('Currrent restore job', ret=ret)) |
| ret = self.show_backup(database_name) |
| LOG.info(L('Currrent backup job', ret=ret)) |
| return False |
| if ret != (): |
| LOG.info(L('RESTORE SNAPSHOT FAILED.', snapshot_name=snapshot_name)) |
| return False |
| if is_wait is False: |
| LOG.info(L('RESTORE SNAPSHOT SUCCEED.', snapshot_name=snapshot_name)) |
| return True |
| else: |
| r = self.wait_restore_job(database_name) |
| return r |
| |
| def show_restore(self, database_name=None): |
| """ |
| |
| Args: |
| database_name: string, database name |
| |
| Returns: |
| ((), (), ()) |
| |
| """ |
| if database_name is None: |
| sql = 'SHOW RESTORE' |
| else: |
| sql = 'SHOW RESTORE FROM %s' % database_name |
| ret = self.__execute_and_rebuild_meta_class(sql, palo_job.RestoreJob) |
| return ret |
| |
| def wait_restore_job(self, database_name=None): |
| """wait restore job finished""" |
| ret = self.show_restore(database_name) |
| if ret == (): |
| return True |
| flag = False |
| while flag is False: |
| flag = True |
| for restore_job in ret: |
| s = restore_job[palo_job.RestoreJob.State] |
| if s != 'FINISHED' and s != 'CANCELLED': |
| flag = False |
| time.sleep(3) |
| ret = self.show_restore(database_name) |
| |
| LOG.info(L('RESTORE JOB FINISHED.', state=ret[-1][palo_job.RestoreJob.State], |
| status=ret[-1][palo_job.RestoreJob.Status])) |
| return 'FINISHED' == ret[-1][palo_job.RestoreJob.State] |
| |
| def cancel_restore(self, database_name=None): |
| """ |
| |
| Args: |
| database_name: string, database name |
| |
| Returns: |
| True if cancel restore job succeed |
| False if cancel resotre job fail |
| |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = 'CANCEL RESTORE FROM %s' % database_name |
| ret = self.execute(sql) |
| if ret == (): |
| LOG.info(L('CANCEL RESTORE JOB SUCCEED.', database_name=database_name)) |
| return True |
| else: |
| LOG.info(L('CANCEL RESTORE JOB FAIL.', database_name=database_name)) |
| return False |
| |
| def show_snapshot(self, repo_name, snapshot=None, timestamp=None): |
| """ |
| Args: |
| repo_name: string, repo name |
| snapshot: string, snapshot name(label) |
| timestamp: string, backup timestamp |
| |
| Returns: |
| ((), ()) |
| """ |
| sql = 'SHOW SNAPSHOT ON %s' % repo_name |
| if snapshot is None and timestamp is None: |
| r = self.execute(sql) |
| return r |
| sql = '%s WHERE' % sql |
| if snapshot: |
| sql = '%s SNAPSHOT = "%s"' % (sql, snapshot) |
| if timestamp: |
| sql = '%s AND TIMESTAMP = "%s"' % (sql, timestamp) |
| elif timestamp: |
| sql = '%s TIMESTAMP = "%s"' % (sql, timestamp) |
| r = self.execute(sql) |
| return r |
| |
| def __get_backup_timestamp(self, repo_name, snapshot_name): |
| """ |
| Args: |
| snapshot_name: string, snapshot name (label) |
| |
| Returns: |
| timestamp: string |
| """ |
| r = self.show_snapshot(repo_name, snapshot=snapshot_name) |
| if len(r) == 1: |
| return r[0][palo_job.SnapshotInfo.Timestamp] |
| else: |
| return None |
| |
| def routine_load(self, table_name, routine_load_job_name, routine_load_property, |
| database_name=None, data_source='KAFKA'): |
| """ |
| Args: |
| table_name: string, table name |
| routine_load_job_name: string, routine load job name |
| routine_load_property: |
| database_name: RoutineLoadProperty class, routine load properties: load property, job property, data source property |
| data_source: string, data source like KAFKA |
| |
| Returns: |
| if create routine load ok |
| """ |
| create_sql = 'CREATE ROUTINE LOAD {routine_load_job_name} ON {table_name} ' \ |
| '{load_property} {job_property} FROM {data_source} {data_source_property}' |
| if database_name is not None: |
| routine_load_job_name = '%s.%s' % (database_name, routine_load_job_name) |
| if not isinstance(routine_load_property, RoutineLoadProperty): |
| LOG.info(L('CREATE ROUTINE LOAD ERROR. routine load property should be class RoutineLoadProperty')) |
| sql = create_sql.format(routine_load_job_name=routine_load_job_name, table_name=table_name, |
| load_property=routine_load_property.load_property, |
| job_property=routine_load_property.job_property, data_source=data_source, |
| data_source_property=routine_load_property.data_source_property) |
| try: |
| ret = self.execute(sql) |
| LOG.info(L('CREATE ROUTINE LOAD JOB OK.')) |
| return ret == () |
| except Exception as e: |
| LOG.info(L('CREATE ROUTINE LOAD JOB ERROR.', msg=str(e))) |
| return False |
| |
| def pause_routine_load(self, routine_load_job_name, database_name=None): |
| """pause routine load job""" |
| if database_name is None: |
| sql = 'PAUSE ROUTINE LOAD FOR %s' % routine_load_job_name |
| else: |
| sql = 'PAUSE ROUTINE LOAD FOR %s.%s' % (database_name, routine_load_job_name) |
| try: |
| ret = self.execute(sql) |
| LOG.info(L('PAUSE ROUTINE LOAD OK', name=routine_load_job_name)) |
| return ret == () |
| except Exception as e: |
| LOG.info(L('PAUSE ROUTINE LOAD ERROR', name=routine_load_job_name, msg=str(e))) |
| return False |
| |
| def resume_routine_load(self, routine_load_job_name, database_name=None): |
| """resume routine load""" |
| if database_name is None: |
| sql = 'RESUME ROUTINE LOAD FOR %s' % routine_load_job_name |
| else: |
| sql = 'RESUME ROUTINE LOAD FOR %s.%s' % (database_name, routine_load_job_name) |
| try: |
| ret = self.execute(sql) |
| LOG.info(L('RESUME ROUTINE LOAD OK', name=routine_load_job_name)) |
| return ret == () |
| except Exception as e: |
| LOG.info(L('RESUME ROUTINE LOAD ERROR', name=routine_load_job_name, msg=str(e))) |
| return False |
| |
| def stop_routine_load(self, routine_load_job_name, database_name=None): |
| """stop routine load""" |
| if database_name is None: |
| job_name = routine_load_job_name |
| else: |
| job_name = '%s.%s' % (database_name, routine_load_job_name) |
| sql = 'STOP ROUTINE LOAD FOR %s' % job_name |
| try: |
| ret = self.execute(sql) |
| LOG.info(L('STOP ROUTINE LOAD OK', name=routine_load_job_name)) |
| show = self.execute('SHOW ALL ROUTINE LOAD FOR %s' % job_name) |
| LOG.info(L('SHOW STOPPED ROUTINE LOAD', ret=show)) |
| return ret == () |
| except Exception as e: |
| LOG.info(L('STOP ROUTINE LOAD ERROR', name=routine_load_job_name, msg=str(e))) |
| show = self.execute('SHOW ALL ROUTINE LOAD FOR %s' % job_name) |
| LOG.info(L('SHOW STOPPED ROUTINE LOAD', ret=show)) |
| return False |
| |
| def show_routine_load(self, routine_load_job_name=None, database_name=None, is_all=False): |
| """show routine load""" |
| if is_all is False: |
| all_word = '' |
| else: |
| all_word = 'ALL' |
| sql = 'SHOW ROUTINE LOAD' |
| if routine_load_job_name is None: |
| routine_load_job_name = '' |
| sql = 'SHOW {all} ROUTINE LOAD'.format(all=all_word) |
| else: |
| if database_name is not None: |
| routine_load_job_name = '%s.%s' % (database_name, routine_load_job_name) |
| else: |
| routine_load_job_name = routine_load_job_name |
| sql = 'SHOW {all} ROUTINE LOAD FOR {routine_load_job}'.format(all=all_word, |
| routine_load_job=routine_load_job_name) |
| try: |
| ret = self.execute(sql) |
| return ret |
| except Exception as e: |
| LOG.info(L('SHOW ROUTINE LOAD ERROR', msg=str(e))) |
| return None |
| |
| def show_routine_load_task(self, routine_load_job_name): |
| """show routine load task""" |
| sql = 'SHOW ROUTINE LOAD TASK WHERE JOBNAME="%s"' % routine_load_job_name |
| ret = self.execute(sql) |
| return ret |
| |
| def get_routine_load_state(self, routine_load_job_name, database_name=None): |
| """get routine load state""" |
| ret = self.show_routine_load(routine_load_job_name, database_name=database_name) |
| if ret == () or ret is None: |
| ret = self.show_routine_load(routine_load_job_name, database_name=database_name, |
| is_all=True) |
| if ret == () or ret is None: |
| return None |
| LOG.info(L('GET ROUTINE LOAD STATE', state=palo_job.RoutineLoadJob(ret[0]).get_state())) |
| return palo_job.RoutineLoadJob(ret[0]).get_state() |
| |
| def wait_routine_load_state(self, routine_load_job_name, state='RUNNING', timeout=600, database_name=None): |
| """ |
| |
| Args: |
| routine_load_job_name: string, routine load job name |
| state: string, 'NEED_SCHEDUL', 'PAUSE', 'RUNNING', 'STOPPED' |
| timeout: int, timeout time |
| |
| Returns: |
| |
| """ |
| while timeout > 0: |
| job_state = self.get_routine_load_state(routine_load_job_name, database_name=database_name) |
| if job_state == state: |
| return True |
| else: |
| time.sleep(1) |
| timeout -= 1 |
| |
| def show_tablet(self, table_name=None, database_name=None, tablet_id=None, partition_list=None): |
| """ |
| SHOW TABLETS |
| [FROM [db_name.]table_name | tablet_id] [partiton(partition_name_1, partition_name_1)] |
| [where [version=1] [and backendid=10000] [and state="NORMAL|ROLLUP|CLONE|DECOMMISSION"]] |
| [order by order_column] |
| [limit [offset,]size] |
| """ |
| if table_name is not None and tablet_id is not None: |
| return None |
| if table_name: |
| if database_name is not None: |
| table_name = '%s.%s' % (database_name, table_name) |
| sql = 'SHOW TABLETS FROM %s' % table_name |
| if partition_list: |
| sql = '%s PARTITION (%s)' % (sql, ','.join(partition_list)) |
| else: |
| sql = 'SHOW TABLET %s' % tablet_id |
| ret = self.execute(sql) |
| return ret |
| |
| def explain_query(self, sql): |
| """explain sql""" |
| sql = 'EXPLAIN %s' % sql |
| ret = self.execute(sql) |
| return ret |
| |
| def show_txn(self, txn_id, database_name=None): |
| """help show TRANSACTION for more msg""" |
| if database_name is None: |
| sql = "SHOW TRANSACTION WHERE id = %s" % txn_id |
| else: |
| sql = "SHOW TRANSACTION FROM %s WHERE id = %s" % (database_name, txn_id) |
| ret = self.execute(sql) |
| return ret |
| |
| def wait_txn(self, txn_id, database_name=None, status='VISIBLE', timeout=600): |
| """wait txn util the status""" |
| while timeout > 0: |
| txn = self.show_txn(txn_id, database_name) |
| txn_status = palo_job.TransactionInfo(txn[0]).get_transaction_status() |
| if txn_status == status: |
| LOG.info(L('GET TXN STATUS', txn_id=txn_id, status=txn_status)) |
| return True |
| elif txn_status == 'VISIBLE' or txn_status == 'ABORT': |
| LOG.info(L('GET TXN STATUS', txn_id=txn_id, status=txn_status)) |
| return False |
| else: |
| time.sleep(1) |
| timeout -= 1 |
| LOG.info(L('GET TXN STATUS TIMEOUT', txn_id=txn_id)) |
| return False |
| |
| def create_bitmap_index_table(self, table_name, bitmap_index_name, index_column_name, |
| storage_type=None, database_name=None, create_format=1, |
| is_wait=False, cluster_name=None): |
| """ |
| Create a bitmap index |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if create_format == 1: |
| sql = 'ALTER TABLE %s.%s ADD INDEX %s (%s) USING BITMAP' % (database_name, table_name, \ |
| bitmap_index_name, index_column_name) |
| elif create_format == 2: |
| sql = 'CREATE INDEX %s ON %s.%s (%s) USING BITMAP' % (bitmap_index_name, database_name, \ |
| table_name, index_column_name) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L("CREATE BITMAP INDEX fail.", database_name=database_name, \ |
| table_name=table_name, \ |
| bitmap_index_name=bitmap_index_name)) |
| return False |
| ret = True |
| if is_wait: |
| ret = self.wait_table_schema_change_job(table_name, cluster_name=cluster_name, |
| database_name=database_name) |
| LOG.info(L("CREATE BITMAP INDEX succ.", database_name=database_name, \ |
| table_name=table_name, \ |
| bitmap_index_name=bitmap_index_name)) |
| return ret |
| |
| def drop_bitmap_index_table(self, table_name, bitmap_index_name, |
| storage_type=None, database_name=None, create_format=1, |
| is_wait=False, cluster_name=None): |
| """ |
| Drop a bitmap index |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| if create_format == 1: |
| sql = 'ALTER TABLE %s.%s DROP INDEX %s' % (database_name, table_name, bitmap_index_name) |
| elif create_format == 2: |
| sql = 'DROP INDEX %s ON %s.%s' % (bitmap_index_name, database_name, table_name) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L("DROP BITMAP INDEX fail.", database_name=database_name, \ |
| table_name=table_name, \ |
| bitmap_index_name=bitmap_index_name)) |
| return False |
| ret = True |
| if is_wait: |
| ret = self.wait_table_schema_change_job(table_name, cluster_name=cluster_name, |
| database_name=database_name) |
| LOG.info(L("DROP BITMAP INDEX succ.", database_name=database_name, \ |
| table_name=table_name, \ |
| bitmap_index_name=bitmap_index_name)) |
| return ret |
| |
| def get_bitmap_index_list(self, table_name, database_name=None): |
| """ |
| Get index list from table |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| sql = "SHOW INDEX FROM %s.%s" % (database_name, table_name) |
| ret = self.execute(sql) |
| return ret |
| |
| def is_exists_index_in_table(self, index_name, index_name_col, table_name, database_name=None): |
| """ |
| return True if index exists in table, else return False |
| """ |
| database_name = self.database_name if database_name is None else database_name |
| index_list = self.get_bitmap_index_list(table_name, database_name) |
| if not index_list: |
| return False |
| for each_index_info in index_list: |
| job_get_index = palo_job.TableIndexInfo(each_index_info) |
| job_index_name = job_get_index.get_key_name() |
| job_index_column = job_get_index.get_column_name() |
| job_index_type = job_get_index.get_index_type() |
| if index_name == job_index_name and index_name_col == job_index_column and \ |
| job_index_type == "BITMAP": |
| return True |
| return False |
| |
| def select_into(self, query, output_file, broker, property=None, format_as=None): |
| """ |
| broker is BrokerInfo class |
| property: dict, for csv, eg.{"column_separator": ",", "line_delimiter": "\n", "max_file_size": "100MB"} |
| query_stmt |
| INTO OUTFILE "file:///path/to/file_prefix" |
| FORMAT AS CSV|PARQUET |
| PROPERTIES |
| (broker_propterties & other_properties); |
| eg: |
| SELECT * FROM tbl |
| INTO OUTFILE "hdfs:/path/to/result_" |
| FORMAT AS CSV |
| PROPERTIELS |
| ( |
| "broker.name" = "my_broker", |
| "broker.hadoop.security.authentication" = "kerberos", |
| "broker.kerberos_principal" = "doris@YOUR.COM", |
| "broker.kerberos_keytab" = "/home/doris/my.keytab" |
| "column_separator" = ",", |
| "line_delimiter" = "\n", |
| "max_file_size" = "100MB" |
| ); |
| """ |
| sql = '{query} INTO OUTFILE "{outfile}" {format_as} PROPERTIES ({properties}) ' |
| if format_as is None: |
| format_as = '' |
| else: |
| format_as = 'FORMAT AS %s' % format_as |
| broker_property = broker.to_select_into_broker_property_str() |
| if property is not None: |
| into_properties = '' |
| for k, v in property.items(): |
| into_properties += ' "%s" = "%s",' % (k, v) |
| p = into_properties.strip(',') |
| property = broker_property + ',' + p |
| else: |
| property = broker_property |
| sql = sql.format(query=query, outfile=output_file, format_as=format_as, |
| properties=property) |
| LOG.info(L('SELECT INTO.', sql=sql)) |
| rows, ret = self.execute(sql, True) |
| LOG.info(L('SELECT INTO ret.', ret=ret)) |
| return ret |
| |
| def set_variables(self, k, v, is_global=False): |
| """ |
| set variables |
| 如果为global需要重新connect,show的时候才会看到新的设置 |
| """ |
| if is_global: |
| sql = 'SET GLOBAL %s=%s' % (k, v) |
| else: |
| sql = 'SET %s=%s' % (k, v) |
| ret = self.execute(sql) |
| return ret == () |
| |
| def show_variables(self, prefix=None): |
| """ |
| show variables |
| """ |
| if prefix: |
| sql = 'SHOW VARIABLES LIKE "%%%s%%"' % prefix |
| else: |
| sql = 'SHOW VARIABLES' |
| ret = self.execute(sql) |
| return ret |
| |
| def wait_routine_load_commit(self, routine_load_job_name, committed_expect_num, timeout=600): |
| """wait task committed""" |
| print('expect commited rows: %s\n' % committed_expect_num) |
| while timeout > 0: |
| ret = self.show_routine_load(routine_load_job_name) |
| routine_load_job = palo_job.RoutineLoadJob(ret[0]) |
| loaded_rows = routine_load_job.get_loaded_rows() |
| print(loaded_rows) |
| if str(loaded_rows) == str(committed_expect_num): |
| time.sleep(3) |
| return True |
| timeout -= 3 |
| time.sleep(3) |
| return False |
| |
| def enable_feature_batch_delete(self, table_name, database_name=None, is_wait=True): |
| """enable feature batch delete""" |
| if database_name is None: |
| sql = 'ALTER TABLE %s ENABLE FEATURE "BATCH_DELETE"' % table_name |
| else: |
| sql = 'ALTER TABLE %s.%s ENABLE FEATURE "BATCH_DELETE"' % (database_name, table_name) |
| ret = self.execute(sql) |
| if is_wait: |
| ret = self.wait_table_schema_change_job(table_name, database_name) |
| return ret |
| return ret == () |
| |
| def truncate(self, table_name, partition_list=None, database_name=None): |
| """truncate table / partition""" |
| if database_name is None: |
| sql = 'TRUNCATE TABLE %s' % table_name |
| else: |
| sql = 'TRUNCATE TABLE %s.%s' % (database_name, table_name) |
| if partition_list is not None: |
| sql = '%s PARTITION (%s)' % (sql, ','.join(partition_list)) |
| ret = self.execute(sql) |
| return ret == () |
| |
| def commit(self): |
| """commit""" |
| ret = self.execute('COMMIT') |
| return ret == () |
| |
| def begin(self): |
| """begin""" |
| ret = self.execute('BEGIN') |
| return ret == () |
| |
| def rollback(self): |
| """rollback""" |
| ret = self.execute('ROLLBACK') |
| return ret == () |
| |
| def update(self, table_name, set_list, where_clause=None, database_name=None): |
| """ |
| ref: UPDATE table_reference SET assignment_list [WHERE where_condition] |
| table_name: str |
| set_list: ['k1=2', 'k3=k3+1'] |
| where_clause: str or ['k1 > 0'], 当是list的时候,使用and进行连接 |
| """ |
| if database_name is not None: |
| table_name = '%s.%s' % (database_name, table_name) |
| if isinstance(set_list, list): |
| set_ref = ','.join(set_list) |
| else: |
| set_ref = set_list |
| |
| if where_clause is not None: |
| if isinstance(where_clause, str): |
| where_ref = 'WHERE %s' % where_clause |
| elif isinstance(where_clause, list): |
| where_ref = 'WHERE %s' % ' AND '.join(where_clause) |
| # else: pass |
| else: |
| where_ref = '' |
| |
| sql = 'UPDATE {tb} SET {set_ref} {where_ref}'.format(tb=table_name, |
| set_ref=set_ref, |
| where_ref=where_ref) |
| ret = self.execute(sql) |
| return ret == () |
| |
| def admin_show_config(self, key=None): |
| """ADMIN SHOW FRONTEND CONFIG [LIKE "pattern"]""" |
| if key is None: |
| sql = 'ADMIN SHOW FRONTEND CONFIG' |
| else: |
| sql = 'ADMIN SHOW FRONTEND CONFIG LIKE "%s"' % key |
| return self.execute(sql) |
| |
| def show_dynamic_partition_tables(self, database_name=None): |
| """ |
| get dynamic partition table families |
| """ |
| if database_name is None: |
| sql = "SHOW DYNAMIC PARTITION TABLES" |
| result = self.execute(sql) |
| return result |
| sql = "SHOW DYNAMIC PARTITION TABLES FROM %s" % database_name |
| result = self.execute(sql) |
| return result |
| |
| def get_comment(self, database_name, table_name): |
| """ |
| get table comment |
| """ |
| sql = "select TABLE_COMMENT from information_schema.TABLES where TABLE_SCHEMA='%s' and TABLE_NAME='%s'" \ |
| % (database_name, table_name) |
| ret = self.execute(sql) |
| return ret[0][0] |
| |
| def get_column_comment(self, table_name, column_name): |
| """ |
| get column comment |
| """ |
| sql = "show full columns from %s" % table_name |
| columns = self.execute(sql) |
| for column in columns: |
| if column[0] == column_name: |
| return column[8] |
| |
| def create_sync_job(self, table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip, |
| columns=None, partitions=None, canal_port='11111', destination='example', batch_size='8192', |
| username='', password='', is_wait=True): |
| """ |
| create sync job |
| table_name: str or list,导入到doris的表 eg: 'table_1', ['table_1', 'table_2'] |
| database_name: str,binlog任务所在的doris数据库 |
| mysql_table_name: str or list,导入的MySQL表,需与doris表一一对应 eg:['mysql_table_1', 'mysql_table_2'] |
| mysql_database_name: list,导入的MySQL表的数据库 |
| columns: list or None,指定列映射,eg: ['k1', 'k2'], [['k1', 'k2'], ['k2', 'v1']] |
| partitions: list or None,指定导入分区,eg: ['p1', 'p2'], [[], ['p1', 'p3']] |
| |
| """ |
| if not isinstance(table_name, list): |
| table_name = [table_name, ] |
| if not isinstance(mysql_table_name, list): |
| mysql_table_name = [mysql_table_name, ] |
| if not isinstance(mysql_database_name, list): |
| mysql_database_name = [mysql_database_name, ] |
| if columns is not None and not isinstance(columns[0], list): |
| columns = [columns] |
| if partitions is not None and not isinstance(partitions[0], list): |
| partitions = [partitions] |
| if len(table_name) != len(mysql_table_name): |
| LOG.info(L('doris tables are not corresponding to mysql tables')) |
| return False |
| channel_desc = "" |
| for i in range(len(table_name)): |
| column_desc = '' |
| if columns is not None and columns[i] is not []: |
| for column in columns[i]: |
| column_desc = '%s%s,' % (column_desc, column) |
| column_desc = '(%s)' % column_desc[:-1] |
| partition_desc = '' |
| if partitions is not None and partitions[i] is not []: |
| if len(partitions[i]) > 1: |
| for partition in partitions[i]: |
| partition_desc = '%s%s,' % (partition_desc, partition) |
| partition_desc = 'PARTITIONS (%s)' % partition_desc[:-1] |
| else: |
| partition_desc = 'PARTITION (%s)' % partitions[i][0] |
| else: |
| partition_desc = '' |
| channel_desc = "%s FROM %s.%s INTO %s %s %s," % (channel_desc, mysql_database_name[i], \ |
| mysql_table_name[i], table_name[i], partition_desc, |
| column_desc) |
| sql = "CREATE SYNC %s.%s (%s)" \ |
| "FROM BINLOG (" \ |
| "'type' = 'canal'," \ |
| "'canal.server.ip' = '%s'," \ |
| "'canal.server.port' = '%s'," \ |
| "'canal.destination' = '%s'," \ |
| "'canal.batchSize' = '%s'," \ |
| "'canal.username' = '%s'," \ |
| "'canal.password' = '%s')" % (database_name, job_name, channel_desc[:-1], canal_ip, canal_port, \ |
| destination, batch_size, username, password) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L('CREATE SYNC JOB fail.', job_name=job_name, database_name=database_name)) |
| return False |
| if is_wait: |
| ret = self.wait_binlog_state(job_name) |
| if not ret: |
| LOG.info(L('CREATE SYNC JOB fail.', database_name=database_name, job_name=job_name)) |
| return False |
| LOG.info(L('CREATE SYNC JOB succ.', database_name=database_name, job_name=job_name)) |
| return True |
| |
| def wait_binlog_state(self, job_name, state='RUNNING'): |
| """ |
| wait modify binlog job state |
| """ |
| job_state = self.get_sync_job_state(job_name) |
| if job_state == state: |
| LOG.info(L('SYNC JOB %s' % state)) |
| return True |
| timeout = 10 |
| while timeout > 0: |
| time.sleep(2) |
| job_state = self.get_sync_job_state(job_name) |
| if job_state == state: |
| LOG.info(L('SYNC JOB %s' % state)) |
| return True |
| else: |
| timeout -= 1 |
| LOG.info(L('SYNC JOB STATE ERROR', EXPECTED=state, ACTUAL=job_state)) |
| return False |
| |
| def pause_sync_job(self, job_name, database_name=None, is_wait=True): |
| """ |
| pause sync job |
| """ |
| if database_name is None: |
| sql = 'PAUSE SYNC JOB %s' % job_name |
| else: |
| sql = 'PAUSE SYNC JOB %s.%s' % (database_name, job_name) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L('PAUSE SYNC JOB fail.', job_name=job_name, database_name=database_name)) |
| return False |
| if is_wait: |
| ret = self.wait_binlog_state(job_name, 'PAUSED') |
| if not ret: |
| LOG.info(L('PAUSE SYNC JOB fail.', database_name=database_name, job_name=job_name)) |
| return False |
| LOG.info(L('PAUSE SYNC JOB succ.', database_name=database_name, job_name=job_name)) |
| return True |
| |
| def resume_sync_job(self, job_name, database_name=None, is_wait=True): |
| """ |
| resume sync job |
| """ |
| if database_name is None: |
| sql = 'RESUME SYNC JOB %s' % job_name |
| else: |
| sql = 'RESUME SYNC JOB %s.%s' % (database_name, job_name) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L('RESUME SYNC JOB fail.', job_name=job_name, database_name=database_name)) |
| return False |
| if is_wait: |
| ret = self.wait_binlog_state(job_name) |
| if not ret: |
| LOG.info(L('RESUME SYNC JOB fail.', database_name=database_name, job_name=job_name)) |
| return False |
| LOG.info(L('RESUME SYNC JOB succ.', database_name=database_name, job_name=job_name)) |
| return True |
| |
| def stop_sync_job(self, job_name, database_name=None, is_wait=True): |
| """ |
| stop sync job |
| """ |
| if database_name is None: |
| sql = 'STOP SYNC JOB %s' % job_name |
| else: |
| sql = 'STOP SYNC JOB %s.%s' % (database_name, job_name) |
| ret = self.execute(sql) |
| if ret != (): |
| LOG.info(L('STOP SYNC JOB fail.', job_name=job_name, database_name=database_name)) |
| return False |
| if is_wait: |
| ret = self.wait_binlog_state(job_name, 'CANCELLED') |
| if not ret: |
| LOG.info(L('STOP SYNC JOB fail.', database_name=database_name, job_name=job_name)) |
| return False |
| LOG.info(L('STOP SYNC JOB succ.', database_name=database_name, job_name=job_name)) |
| return True |
| |
| def get_sync_job_state(self, job_name): |
| """ |
| get sync job state |
| """ |
| sync_job_list = self.show_sync_job() |
| condition_col_idx = palo_job.SyncJobInfo.JobName |
| retrun_clo_idx = palo_job.SyncJobInfo.State |
| return util.get_attr_condition_value(sync_job_list, condition_col_idx, job_name, retrun_clo_idx) |
| |
| def show_sync_job(self, database_name=None): |
| """ |
| get sync job information |
| """ |
| if database_name is None: |
| sql = "SHOW SYNC JOB" |
| result = self.execute(sql) |
| return result |
| sql = "SHOW SYNC JOB FROM %s" % database_name |
| result = self.execute(sql) |
| return result |
| |
| def set_frontend_config(self, config, value): |
| """ |
| admin set frontend config |
| """ |
| sql = 'ADMIN SET FRONTEND CONFIG ("%s" = "%s")' % (config, value) |
| return self.execute(sql) |
| |
| def get_partition_replica_allocation(self, table_name, partition_name, database_name=None): |
| """ |
| get table family replica allocation |
| """ |
| partition = self.get_partition(table_name, partition_name, database_name) |
| if partition: |
| return partition[palo_job.PartitionInfo.ReplicaAllocation] |
| else: |
| return None |
| |
| def modify_resource_tag(self, host_name, port, tag_location): |
| """ |
| tag_location: str eg. grout_a, dict example: {'tag.location': 'a', 'tag.compute': 'b', ...} |
| 修改be标签 |
| """ |
| if isinstance(tag_location, str): |
| sql = "ALTER SYSTEM MODIFY BACKEND '%s:%s' SET ('tag.location'='%s')" % (host_name, port, tag_location) |
| elif isinstance(tag_location, dict): |
| sql = "ALTER SYSTEM MODIFY BACKEND '%s:%s' SET %s" % (host_name, port, |
| util.convert_dict2property(tag_location)) |
| else: |
| return None |
| result = self.execute(sql) |
| time.sleep(2) |
| if result != (): |
| LOG.info(L('MODIFY BACKEND FAIL', backend=host_name, port=port, tag_location=tag_location)) |
| return False |
| return True |
| |
| def get_replica_backend_id(self, table_name): |
| """get BackendId from replica status""" |
| sql = "ADMIN SHOW REPLICA STATUS FROM %s" % table_name |
| replica_status = self.execute(sql) |
| column_idx = palo_job.ReplicaStatus.BackendId |
| return util.get_attr(replica_status, column_idx) |
| |
| def admin_check_tablet(self, tablet_id_list): |
| """admin check tablet""" |
| sql = "ADMIN CHECK TABLE (%s) PROPERTIES('type'='consistency')" % ','.join(tablet_id_list) |
| self.execute(sql) |
| |
| def admin_repair_table(self, table_name, partition_list=None): |
| """admin repair table""" |
| sql = "ADMIN REPAIR TABLE %s" % table_name |
| if partition_list is not None: |
| sql = '%s PARTITION (%s)' % (sql, ','.join(partition_list)) |
| self.execute(sql) |
| |
| def admin_diagnose_tablet(self, tablet_id): |
| """admin diagnose tablet""" |
| sql = "ADMIN DIAGNOSE TABLET %s" % tablet_id |
| ret = self.execute(sql) |
| return ret |
| |
| def get_resource_tag(self, host_ip): |
| """通过be的IP获取be的标签信息""" |
| backend_list = self.get_backend_list() |
| for backend in backend_list: |
| be = palo_job.BackendProcInfo(backend) |
| if be.get_ip() == host_ip: |
| return be.get_tag() |
| return None |
| |
| def get_resource_tag_by_id(self, be_id): |
| """获取backend tag by id""" |
| be = self.get_backend(be_id) |
| if not be: |
| return None |
| return palo_job.BackendProcInfo(be).get_tag() |