| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # Module used for executing queries and gathering results. |
| # The QueryExecutor is meant to be generic and doesn't |
| # have the knowledge of how to actually execute a query. It takes a query and its config |
| # and executes is against a executor function. |
| # For example (in pseudo-code): |
| # |
| # def exec_func(query, config): |
| # ... |
| # |
| # config = ImpalaBeeswaxQueryExecConfig() |
| # executor = QueryExecutor('beeswax', query, config, exec_func) |
| # executor.run() |
| # result = executor.result |
| |
| import logging |
| import os |
| import re |
| |
| from tests.performance.query import Query |
| |
| # Setup logging for this module. |
| LOG = logging.getLogger('query_executor') |
| LOG.setLevel(level=logging.INFO) |
| |
| # Globals. |
| hive_result_regex = 'Time taken: (\d*).(\d*) seconds' |
| # Match any CRUD statement that can follow EXPLAIN. |
| # The statement may begin with SQL line comments starting with -- |
| COMMENT_LINES_REGEX = r'(?:\s*--.*\n)*' |
| DDL_CRUD_PATTERN = \ |
| re.compile(COMMENT_LINES_REGEX + |
| r'\s*((CREATE|DELETE|INSERT|SELECT|UPDATE|UPSERT|WITH)\s|VALUES\s*\()', |
| re.IGNORECASE) |
| |
| |
| ## TODO: Split executors into their own modules. |
| class QueryExecConfig(object): |
| """Base Class for Execution Configs |
| |
| Attributes: |
| plugin_runner (PluginRunner?) |
| """ |
| def __init__(self, plugin_runner=None): |
| self.plugin_runner = plugin_runner |
| |
| |
| class ImpalaQueryExecConfig(QueryExecConfig): |
| """Base class for Impala query execution config |
| |
| Attributes: |
| impalad (str): address of impalad <host>:<port> |
| """ |
| |
| def __init__(self, plugin_runner=None, impalad='localhost:21000'): |
| super(ImpalaQueryExecConfig, self).__init__(plugin_runner=plugin_runner) |
| self._impalad = impalad |
| |
| @property |
| def impalad(self): |
| return self._impalad |
| |
| @impalad.setter |
| def impalad(self, value): |
| self._impalad = value |
| |
| |
| class JdbcQueryExecConfig(ImpalaQueryExecConfig): |
| """Impala query execution config for jdbc |
| |
| Attributes: |
| tranport (?): ? |
| """ |
| |
| JDBC_CLIENT_PATH = os.path.join(os.environ['IMPALA_HOME'], 'bin/run-jdbc-client.sh') |
| |
| def __init__(self, plugin_runner=None, impalad='localhost:21050', transport=None): |
| super(JdbcQueryExecConfig, self).__init__(plugin_runner=plugin_runner, |
| impalad=impalad) |
| self.transport = transport |
| |
| @property |
| def jdbc_client_cmd(self): |
| """The args to run the jdbc client. |
| |
| Constructed on the fly, since the impalad it points to can change. |
| """ |
| assert self.transport is not None |
| return JdbcQueryExecConfig.JDBC_CLIENT_PATH + ' -i "%s" -t %s' % (self._impalad, |
| self.transport) |
| |
| class ImpalaHS2QueryConfig(ImpalaQueryExecConfig): |
| def __init__(self, use_kerberos=False, impalad="localhost:21050", plugin_runner=None): |
| super(ImpalaHS2QueryConfig, self).__init__(plugin_runner=plugin_runner, |
| impalad=impalad) |
| # TODO Use a config dict for query execution options similar to HS2 |
| self.use_kerberos = use_kerberos |
| |
| |
| class HiveHS2QueryConfig(QueryExecConfig): |
| def __init__(self, |
| plugin_runner=None, |
| exec_options = None, |
| use_kerberos=False, |
| user=None, |
| hiveserver='localhost'): |
| super(HiveHS2QueryConfig, self).__init__() |
| self.exec_options = dict() |
| self._build_options(exec_options) |
| self.use_kerberos = use_kerberos |
| self.user = user |
| self.hiveserver = hiveserver |
| |
| def _build_options(self, exec_options): |
| """Read the exec_options into self.exec_options |
| |
| Args: |
| exec_options (str): String formatted as "command1;command2" |
| """ |
| |
| if exec_options: |
| # exec_options are seperated by ; on the command line |
| options = exec_options.split(';') |
| for option in options: |
| key, value = option.split(':') |
| # The keys in HiveService QueryOptions are lower case. |
| self.exec_options[key.lower()] = value |
| |
| class BeeswaxQueryExecConfig(ImpalaQueryExecConfig): |
| """Impala query execution config for beeswax |
| |
| Args: |
| use_kerberos (boolean) |
| exec_options (str): String formatted as "opt1:val1;opt2:val2" |
| impalad (str): address of impalad <host>:<port> |
| plugin_runner (?): ? |
| |
| Attributes: |
| use_kerberos (boolean) |
| exec_options (dict str -> str): execution options |
| """ |
| |
| def __init__(self, use_kerberos=False, exec_options=None, impalad='localhost:21000', |
| plugin_runner=None, user=None, password=None, use_ssl=False): |
| super(BeeswaxQueryExecConfig, self).__init__(plugin_runner=plugin_runner, |
| impalad=impalad) |
| self.use_kerberos = use_kerberos |
| self.exec_options = dict() |
| self._build_options(exec_options) |
| self.user = user |
| self.password = password |
| self.use_ssl = use_ssl |
| |
| def _build_options(self, exec_options): |
| """Read the exec_options into self.exec_options |
| |
| Args: |
| exec_options (str): String formatted as "opt1:val1;opt2:val2" |
| """ |
| |
| if exec_options: |
| # exec_options are seperated by ; on the command line |
| options = exec_options.split(';') |
| for option in options: |
| key, value = option.split(':') |
| # The keys in ImpalaService QueryOptions are upper case. |
| self.exec_options[key.upper()] = value |
| |
| |
| class QueryExecutor(object): |
| """Executes one or more queries. |
| |
| Args: |
| name (str): eg. "hive" |
| query (Query): Container holding 1 or more ;-delimited SQL statements to be executed |
| func (function): Function that accepts a QueryExecOption parameter and returns a |
| ImpalaQueryResult. Eg. execute_using_impala_beeswax |
| config (QueryExecOption) |
| exit_on_error (boolean): Exit right after an error encountered. |
| |
| Attributes: |
| exec_func (function): Function that accepts a QueryExecOption parameter and returns |
| an ImpalaQueryResult. |
| exec_config (QueryExecOption) |
| query (Query): Container holding 1 or more ;-delimited SQL statements to be executed |
| exit_on_error (boolean): Exit right after an error encountered. |
| executor_name (str): eg. "hive" |
| result (ImpalaQueryResult): Contains the result after execute method is called. |
| """ |
| |
| def __init__(self, name, query, func, config, exit_on_error): |
| self.exec_func = func |
| self.exec_config = config |
| self.query = query |
| self.exit_on_error = exit_on_error |
| self.executor_name = name |
| self._result = None |
| |
| def prepare(self, impalad): |
| """Prepare the query to be run. |
| |
| For now, this sets the impalad that the query connects to. If the executor is hive, |
| it's a no op. |
| """ |
| if 'hive' not in self.executor_name: |
| self.exec_config.impalad = impalad |
| |
| def execute(self, plan_first=False): |
| """Execute a set of SQL statements using the given execution function, |
| and return a result object. SQL statements can be the familiar SELECT, INSERT, |
| DELETE, UPDATE and UPSERT DML commands as well as utilities like SET, SHOW, |
| VERSION, etc. |
| |
| If plan_first is true, EXPLAIN the "explainable" queries in the set |
| first so timing does not include the initial metadata loading |
| required for planning. |
| |
| This function furnishes a query result object in self._result, for the last |
| query in the batch ONLY. |
| """ |
| assert isinstance(self.query, Query) |
| orig_query_str = self.query.query_str |
| try: |
| statements = self.query.query_str.split(';') |
| |
| if plan_first: |
| # Break out multiple statements in self.query |
| for stmt in statements: |
| ddl_crud_match = DDL_CRUD_PATTERN.match(stmt) |
| if not ddl_crud_match: |
| # Don't EXPLAIN this statement |
| continue |
| |
| self.query.query_str = 'EXPLAIN ' + stmt + ';' |
| LOG.debug('Planning %s' % self.query.query_str) |
| self.exec_func(self.query, self.exec_config) |
| |
| # Now actually execute |
| for stmt in statements: |
| self.query.query_str = stmt + ';' |
| LOG.debug('Executing %s' % self.query.query_str) |
| self._result = self.exec_func(self.query, self.exec_config) |
| |
| if not self._result.success: |
| if self.exit_on_error: |
| raise RuntimeError(self._result.query_error) |
| else: |
| LOG.info("Continuing execution") |
| finally: |
| self.query.query_str = orig_query_str |
| |
| # We do not need to restore the SET options we changed for the next batch |
| # (Query object) because the scheduler runs each Query on its own connection |
| # (XXX pretty strange for a performance test. :) |
| |
| @property |
| def result(self): |
| """Getter for the result of the query execution. |
| |
| A result is a ImpalaQueryResult object that contains the details of a single run of |
| the query. |
| """ |
| return self._result |