| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import pickle |
| import re |
| import os |
| import job |
| from collections import defaultdict |
| |
| class Report(object): |
| '''Contains information about a completed job, such as the number of crashes and stack |
| traces from every crash. The report is usually displayed on a web page. |
| ''' |
| def __init__(self, job_id): |
| self.num_queries = 0 |
| self.run_time = 0 |
| self.run_date = 0 |
| self.job_name = '' |
| self.num_crashes = 0 |
| self.num_row_count_mismatch = 0 |
| self.num_mismatch = 0 |
| self.job_id = job_id |
| self.git_hash = '' |
| self.grouped_results = None |
| self.parent_job_name = '' |
| self.num_queries_returned_correct_data = 0 |
| self.get_results() |
| |
| @property |
| def run_time_str(self): |
| '''Return the running time of the job as a string in human readable format.''' |
| m, s = divmod(self.run_time, 60) |
| h, m = divmod(m, 60) |
| return '{0:02d}:{1:02d}:{2:02d}'.format(int(h), int(m), int(s)) |
| |
| def classify_error(self, error): |
| d = { |
| ur'LINE \d+:': 'Postgres_error', |
| ur'Permission denied': 'permission_denied', |
| ur'^AnalysisException': 'AnalysisException', |
| ur'^Column \d+ in row \d+ does not match': 'mismatch', |
| ur'^Could not connect': 'could_not_connect', |
| ur'^IllegalStateException': 'IllegalStateException', |
| ur'^Invalid query handle: ': 'invalid_query_handle', |
| ur'^Known issue:': 'known_issue', |
| ur'^Operation is in ERROR_STATE': 'error_state', |
| ur'^Query timed out after \d+ seconds': 'timeout', |
| ur'^Row counts do not match': 'row_counts', |
| ur'^Too much data': 'too_much_data', |
| ur'^Unknown expr node type: \d+': 'unkown_node', |
| ur'^Year is out of valid range': 'year_range', |
| ur'^[A-Za-z]+ out of range': 'out_of_range', |
| ur'^division by zero': 'division_by_zero'} |
| |
| for r in d: |
| if re.search(r, error): |
| return d[r] |
| return 'unrecognized' |
| |
| def group_queries(self, all_queries, group_func): |
| '''General function that returns a dictionary with keys that are generated by |
| group_func. all_queries is a list of queries. |
| group_func should take query as a parameter and return a string containing an |
| interesting property of the query which will be used as key in the dictionary. |
| ''' |
| grouped_queries = defaultdict(list) |
| for query in all_queries: |
| grouped_queries[group_func(query)].append(query) |
| return grouped_queries |
| |
| def __str__(self): |
| '''TODO: Render report as text. |
| ''' |
| return '' |
| |
| def get_first_impala_frame(self, query_result): |
| '''Extracts the first impala frame in the stack trace. |
| ''' |
| stack = query_result['formatted_stack'] |
| if stack: |
| for line in stack.split('\n'): |
| match = re.search(ur'(impala::.*) \(', line) |
| if match: |
| return match.group(1) |
| else: |
| return None |
| |
| def _format_stack(self, stack): |
| '''Cleans up the stack trace. |
| ''' |
| |
| def clean_frame(frame): |
| #remove memory address from each frame |
| reg = re.match(ur'#\d+ *0x[0123456789abcdef]* in (.*)', frame) |
| if reg: return reg.group(1) |
| # this is for matching lines like "#7 SLL_Next (this=0x9046780, src=0x90467c8... |
| reg = re.match(ur'#\d+ *(\S.*)', frame) |
| if reg: return reg.group(1) |
| return frame |
| |
| def stack_gen(): |
| '''Generator that yields impala stack trace lines line by line. |
| ''' |
| if stack: |
| active = False |
| for line in stack.split('\n'): |
| if active or line.startswith('#0'): |
| active = True |
| yield line |
| |
| return '\n'.join(clean_frame(l) for l in stack_gen()) |
| |
| def get_results(self): |
| '''Analyses the completed job and extracts important results into self. This method |
| should be called as soon as the object is created. |
| ''' |
| from controller import PATH_TO_FINISHED_JOBS |
| |
| def group_outer_func(query): |
| if 'stack' in query: |
| return 'stack' |
| return self.classify_error(query['error']) |
| |
| def stack_group_func(query): |
| return self.get_first_impala_frame(query['stack']) |
| |
| with open(os.path.join(PATH_TO_FINISHED_JOBS, self.job_id)) as f: |
| job = pickle.load(f) |
| self.grouped_results = self.group_queries(job.result_list, group_outer_func) |
| |
| # Format the stack for queries that have a stack |
| for query in self.grouped_results['stack']: |
| query['formatted_stack'] = self._format_stack(query['stack']) |
| |
| self.num_crashes = len(self.grouped_results['stack']) |
| self.num_row_count_mismatch = len(self.grouped_results['row_counts']) |
| self.num_mismatch = len(self.grouped_results['mismatch']) |
| |
| self.grouped_stacks = self.group_queries( |
| self.grouped_results['stack'], self.get_first_impala_frame) |
| |
| self.run_time = job.stop_time - job.start_time |
| self.run_date = job.start_time |
| self.job_name = job.job_name |
| self.git_hash = job.git_hash |
| self.num_queries_executed = job.num_queries_executed |
| self.num_queries_returned_correct_data = job.num_queries_returned_correct_data |
| if job.parent_job: |
| with open(os.path.join(PATH_TO_FINISHED_JOBS, job.parent_job)) as f: |
| parent_job = pickle.load(f) |
| self.parent_job_name = parent_job.job_name |
| |
| def save_pickle(self): |
| from controller import PATH_TO_REPORTS |
| with open(os.path.join(PATH_TO_REPORTS, self.job_id), 'w') as f: |
| pickle.dump(self, f) |