| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| |
| from tests.common.test_dimensions import ( |
| TableFormatInfo, |
| get_dataset_from_workload, |
| load_table_info_dimension) |
| from tests.performance.query_executor import ( |
| BeeswaxQueryExecConfig, |
| HiveHS2QueryConfig, |
| ImpalaHS2QueryConfig, |
| JdbcQueryExecConfig, |
| QueryExecutor) |
| from tests.performance.query_exec_functions import ( |
| execute_using_hive_hs2, |
| execute_using_impala_beeswax, |
| execute_using_impala_hs2, |
| execute_using_jdbc) |
| from tests.performance.scheduler import Scheduler |
| |
| LOG = logging.getLogger('workload_runner') |
| |
| |
| class WorkloadRunner(object): |
| """Runs query files and captures results from the specified workload(s) |
| |
| The usage is: |
| 1) Initialize WorkloadRunner with desired execution parameters. |
| 2) Call workload_runner.run() |
| |
| Internally, for each workload, this module looks up and parses that workload's |
| query files and reads the workload's test vector to determine what combination(s) |
| of file format / compression to run with. |
| |
| Args: |
| workload (Workload) |
| scale_factor (str): eg. "300gb" |
| config (WorkloadConfig) |
| |
| Attributes: |
| workload (Workload) |
| scale_factor (str): eg. "300gb" |
| config (WorkloadConfig) |
| exit_on_error (boolean) |
| results (list of ImpalaQueryResult) |
| _test_vectors (list of ?) |
| """ |
| |
| def __init__(self, workload, scale_factor, config): |
| self.workload = workload |
| self.scale_factor = scale_factor |
| self.config = config |
| self.exit_on_error = not self.config.continue_on_query_error |
| if self.config.verbose: LOG.setLevel(level=logging.DEBUG) |
| self._generate_test_vectors() |
| self._results = list() |
| |
| @property |
| def results(self): |
| return self._results |
| |
| def _generate_test_vectors(self): |
| """Generate test vector objects |
| |
| If the user has specified a set for table_formats, generate them, otherwise generate |
| vectors for all table formats within the specified exploration strategy. |
| """ |
| self._test_vectors = [] |
| if self.config.table_formats: |
| dataset = get_dataset_from_workload(self.workload.name) |
| for tf in self.config.table_formats: |
| self._test_vectors.append(TableFormatInfo.create_from_string(dataset, tf)) |
| else: |
| vectors = load_table_info_dimension(self.workload.name, |
| self.config.exploration_strategy) |
| self._test_vectors = [vector.value for vector in vectors] |
| |
| def _create_executor(self, executor_name): |
| query_options = { |
| 'impala_beeswax': lambda: (execute_using_impala_beeswax, |
| BeeswaxQueryExecConfig(plugin_runner=self.config.plugin_runner, |
| exec_options=self.config.exec_options, |
| use_kerberos=self.config.use_kerberos, |
| user=self.config.user if self.config.password else None, |
| password=self.config.password, |
| use_ssl=self.config.use_ssl |
| )), |
| 'impala_jdbc': lambda: (execute_using_jdbc, |
| JdbcQueryExecConfig(plugin_runner=self.config.plugin_runner) |
| ), |
| 'impala_hs2': lambda: (execute_using_impala_hs2, |
| ImpalaHS2QueryConfig(plugin_runner=self.config.plugin_runner, |
| use_kerberos=self.config.use_kerberos |
| )), |
| 'hive_hs2': lambda: (execute_using_hive_hs2, |
| HiveHS2QueryConfig(hiveserver=self.config.hiveserver, |
| plugin_runner=self.config.plugin_runner, |
| exec_options=self.config.exec_options, |
| user=self.config.user, |
| use_kerberos=self.config.use_kerberos |
| )) |
| } [executor_name]() |
| return query_options |
| |
| def _execute_queries(self, queries): |
| """Execute a set of queries. |
| |
| Create query executors for each query, and pass them along with config information to |
| the scheduler. |
| """ |
| executor_name = "{0}_{1}".format(self.config.exec_engine, self.config.client_type) |
| exec_func, exec_config = self._create_executor(executor_name) |
| query_executors = [] |
| # Build an executor for each query |
| for query in queries: |
| query_executor = QueryExecutor(executor_name, |
| query, |
| exec_func, |
| exec_config, |
| self.exit_on_error) |
| query_executors.append(query_executor) |
| # Initialize the scheduler. |
| scheduler = Scheduler(query_executors=query_executors, |
| shuffle=self.config.shuffle_queries, |
| iterations=self.config.workload_iterations, |
| query_iterations=self.config.query_iterations, |
| impalads=self.config.impalads, |
| num_clients=self.config.num_clients, |
| plan_first=getattr(self.config, 'plan_first', False)) |
| |
| scheduler.run() |
| self._results.extend(scheduler.results) |
| |
| def run(self): |
| """ |
| Runs the workload against all test vectors serially and stores the results. |
| """ |
| for test_vector in self._test_vectors: |
| # Transform the query strings to Query objects for a combination of scale factor and |
| # the test vector. |
| queries = self.workload.construct_queries(test_vector, self.scale_factor) |
| self._execute_queries(queries) |