| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import requests |
| import json |
| from dataclasses import dataclass, field, asdict |
| from typing import Dict, List, Tuple, Optional |
| from datetime import datetime, timedelta |
| import pytz # Add this import for timezone handling |
| from collections import defaultdict |
| import time |
| import logging |
| import concurrent.futures # Add this import at the top |
| import pathlib |
| import pickle |
| from abc import ABC, abstractmethod |
| |
| logger = logging.getLogger(__name__) |
| |
| @dataclass |
| class TestOutcome: |
| passed: int |
| failed: int |
| skipped: int |
| flaky: int |
| not_selected: int = field(metadata={'name': 'notSelected'}) |
| total: int |
| |
| @dataclass |
| class BuildInfo: |
| id: str |
| timestamp: datetime |
| duration: int |
| has_failed: bool |
| |
| @dataclass |
| class TestTimelineEntry: |
| build_id: str |
| timestamp: datetime |
| outcome: str # "passed", "failed", "flaky", etc. |
| |
| @dataclass |
| class TestResult: |
| name: str |
| outcome_distribution: TestOutcome |
| first_seen: datetime |
| timeline: List[TestTimelineEntry] = field(default_factory=list) |
| recent_failure_rate: float = 0.0 # Added to track recent failure trends |
| |
| @dataclass |
| class TestContainerResult: |
| build_id: str |
| outcome: str |
| timestamp: Optional[datetime] = None |
| |
| @dataclass |
| class TestCaseResult(TestResult): |
| """Extends TestResult to include container-specific information""" |
| container_name: str = "" |
| |
| @dataclass |
| class BuildCache: |
| last_update: datetime |
| builds: Dict[str, 'BuildInfo'] |
| |
| def to_dict(self): |
| return { |
| 'last_update': self.last_update.isoformat(), |
| 'builds': {k: asdict(v) for k, v in self.builds.items()} |
| } |
| |
| @classmethod |
| def from_dict(cls, data: dict) -> 'BuildCache': |
| return cls( |
| last_update=datetime.fromisoformat(data['last_update']), |
| builds={k: BuildInfo(**v) for k, v in data['builds'].items()} |
| ) |
| |
| class CacheProvider(ABC): |
| @abstractmethod |
| def get_cache(self) -> Optional[BuildCache]: |
| pass |
| |
| @abstractmethod |
| def save_cache(self, cache: BuildCache): |
| pass |
| |
| class LocalCacheProvider(CacheProvider): |
| def __init__(self, cache_dir: str = None): |
| if cache_dir is None: |
| cache_dir = os.path.join(os.path.expanduser("~"), ".develocity_cache") |
| self.cache_file = os.path.join(cache_dir, "build_cache.pkl") |
| os.makedirs(cache_dir, exist_ok=True) |
| |
| def get_cache(self) -> Optional[BuildCache]: |
| try: |
| if os.path.exists(self.cache_file): |
| with open(self.cache_file, 'rb') as f: |
| return pickle.load(f) |
| except Exception as e: |
| logger.warning(f"Failed to load local cache: {e}") |
| return None |
| |
| def save_cache(self, cache: BuildCache): |
| try: |
| with open(self.cache_file, 'wb') as f: |
| pickle.dump(cache, f) |
| except Exception as e: |
| logger.warning(f"Failed to save local cache: {e}") |
| |
| class GitHubActionsCacheProvider(CacheProvider): |
| def __init__(self): |
| self.cache_key = "develocity-build-cache" |
| |
| def get_cache(self) -> Optional[BuildCache]: |
| try: |
| # Check if running in GitHub Actions |
| if not os.environ.get('GITHUB_ACTIONS'): |
| return None |
| |
| cache_path = os.environ.get('GITHUB_WORKSPACE', '') |
| cache_file = os.path.join(cache_path, self.cache_key + '.json') |
| |
| if os.path.exists(cache_file): |
| with open(cache_file, 'r') as f: |
| data = json.load(f) |
| return BuildCache.from_dict(data) |
| except Exception as e: |
| logger.warning(f"Failed to load GitHub Actions cache: {e}") |
| return None |
| |
| def save_cache(self, cache: BuildCache): |
| try: |
| if not os.environ.get('GITHUB_ACTIONS'): |
| return |
| |
| cache_path = os.environ.get('GITHUB_WORKSPACE', '') |
| cache_file = os.path.join(cache_path, self.cache_key + '.json') |
| |
| with open(cache_file, 'w') as f: |
| json.dump(cache.to_dict(), f) |
| except Exception as e: |
| logger.warning(f"Failed to save GitHub Actions cache: {e}") |
| |
| class TestAnalyzer: |
| def __init__(self, base_url: str, auth_token: str): |
| self.base_url = base_url |
| self.headers = { |
| 'Authorization': f'Bearer {auth_token}', |
| 'Accept': 'application/json' |
| } |
| self.default_chunk_size = timedelta(days=14) |
| self.api_retry_delay = 2 # seconds |
| self.max_api_retries = 3 |
| |
| # Initialize cache providers |
| self.cache_providers = [ |
| GitHubActionsCacheProvider(), |
| LocalCacheProvider() |
| ] |
| self.build_cache = None |
| self._load_cache() |
| |
| def _load_cache(self): |
| """Load cache from the first available provider""" |
| for provider in self.cache_providers: |
| cache = provider.get_cache() |
| if cache is not None: |
| self.build_cache = cache |
| logger.info(f"Loaded cache from {provider.__class__.__name__}") |
| return |
| logger.info("No existing cache found") |
| |
| def _save_cache(self): |
| """Save cache to all providers""" |
| if self.build_cache: |
| for provider in self.cache_providers: |
| provider.save_cache(self.build_cache) |
| logger.info(f"Saved cache to {provider.__class__.__name__}") |
| |
| def build_query( |
| self, |
| project: str, |
| chunk_start: datetime, |
| chunk_end: datetime, |
| test_tags: List[str] |
| ) -> str: |
| """ |
| Constructs the query string to be used in both build info and test containers API calls. |
| |
| Args: |
| project: The project name. |
| chunk_start: The start datetime for the chunk. |
| chunk_end: The end datetime for the chunk. |
| test_tags: A list of tags to include. |
| |
| Returns: |
| A formatted query string. |
| """ |
| test_tags.append("+github") |
| tags = [] |
| for tag in test_tags: |
| if tag.startswith("+"): |
| tags.append(f"tag:{tag[1:]}") |
| elif tag.startswith("-"): |
| tags.append(f"-tag:{tag[1:]}") |
| else: |
| raise ValueError("Tag should include + or - to indicate inclusion or exclusion.") |
| |
| tags = " ".join(tags) |
| return f"project:{project} buildStartTime:[{chunk_start.isoformat()} TO {chunk_end.isoformat()}] gradle.requestedTasks:test {tags}" |
| |
| def process_chunk( |
| self, |
| chunk_start: datetime, |
| chunk_end: datetime, |
| project: str, |
| test_tags: List[str], |
| remaining_build_ids: set | None, |
| max_builds_per_request: int |
| ) -> Dict[str, BuildInfo]: |
| """Helper method to process a single chunk of build information""" |
| chunk_builds = {} |
| |
| # Use the helper method to build the query |
| query = self.build_query(project, chunk_start, chunk_end, test_tags) |
| |
| # Initialize pagination for this chunk |
| from_build = None |
| continue_chunk = True |
| |
| while continue_chunk and (remaining_build_ids is None or remaining_build_ids): |
| query_params = { |
| 'query': query, |
| 'models': ['gradle-attributes'], |
| 'allModels': 'false', |
| 'maxBuilds': max_builds_per_request, |
| 'reverse': 'false', |
| 'fromInstant': int(chunk_start.timestamp() * 1000) |
| } |
| |
| if from_build: |
| query_params['fromBuild'] = from_build |
| |
| for attempt in range(self.max_api_retries): |
| try: |
| response = requests.get( |
| f'{self.base_url}/api/builds', |
| headers=self.headers, |
| params=query_params, |
| timeout=(5, 30) |
| ) |
| response.raise_for_status() |
| break |
| except requests.exceptions.Timeout: |
| if attempt == self.max_api_retries - 1: |
| raise |
| time.sleep(self.api_retry_delay * (attempt + 1)) |
| except requests.exceptions.RequestException: |
| raise |
| |
| response_json = response.json() |
| |
| if not response_json: |
| break |
| |
| for build in response_json: |
| build_id = build['id'] |
| |
| if 'models' in build and 'gradleAttributes' in build['models']: |
| gradle_attrs = build['models']['gradleAttributes'] |
| if 'model' in gradle_attrs: |
| attrs = gradle_attrs['model'] |
| build_timestamp = datetime.fromtimestamp(attrs['buildStartTime'] / 1000, pytz.UTC) |
| |
| if build_timestamp >= chunk_end: |
| continue_chunk = False |
| break |
| |
| if remaining_build_ids is None or build_id in remaining_build_ids: |
| if 'problem' not in gradle_attrs: |
| chunk_builds[build_id] = BuildInfo( |
| id=build_id, |
| timestamp=build_timestamp, |
| duration=attrs.get('buildDuration'), |
| has_failed=attrs.get('hasFailed', False) |
| ) |
| if remaining_build_ids is not None: |
| remaining_build_ids.remove(build_id) |
| |
| if continue_chunk and response_json: |
| from_build = response_json[-1]['id'] |
| else: |
| continue_chunk = False |
| |
| time.sleep(0.5) # Rate limiting between pagination requests |
| |
| return chunk_builds |
| |
| def get_build_info( |
| self, |
| build_ids: List[str] = None, |
| project: str = None, |
| test_tags: List[str] = None, |
| query_days: int = None, |
| bypass_cache: bool = False, |
| fetch_all: bool = False |
| ) -> Dict[str, BuildInfo]: |
| builds = {} |
| max_builds_per_request = 100 |
| cutoff_date = datetime.now(pytz.UTC) - timedelta(days=query_days) |
| current_time = datetime.now(pytz.UTC) |
| |
| if not fetch_all and not build_ids: |
| raise ValueError(f"Either build_ids must be provided or fetch_all must be True: {build_ids} {fetch_all}") |
| |
| # Get builds from cache if available and bypass_cache is False |
| if not bypass_cache and self.build_cache: |
| cached_builds = self.build_cache.builds |
| cached_cutoff = self.build_cache.last_update - timedelta(days=query_days) |
| |
| if fetch_all: |
| # Use all cached builds within the time period |
| for build_id, build in cached_builds.items(): |
| if build.timestamp >= cached_cutoff: |
| builds[build_id] = build |
| else: |
| # Use cached data for specific builds within the cache period |
| for build_id in build_ids: |
| if build_id in cached_builds: |
| build = cached_builds[build_id] |
| if build.timestamp >= cached_cutoff: |
| builds[build_id] = build |
| |
| # Update cutoff date to only fetch new data |
| cutoff_date = self.build_cache.last_update |
| logger.info(f"Using cached data up to {cutoff_date.isoformat()}") |
| |
| if not fetch_all: |
| # Remove already found builds from the search list |
| build_ids = [bid for bid in build_ids if bid not in builds] |
| |
| if not build_ids: |
| logger.info("All builds found in cache") |
| return builds |
| |
| # Fetch remaining builds from API |
| remaining_build_ids = set(build_ids) if not fetch_all else None |
| chunk_size = self.default_chunk_size |
| |
| # Create time chunks |
| chunks = [] |
| chunk_start = cutoff_date |
| while chunk_start < current_time: |
| chunk_end = min(chunk_start + chunk_size, current_time) |
| chunks.append((chunk_start, chunk_end)) |
| chunk_start = chunk_end |
| |
| total_start_time = time.time() |
| |
| # Process chunks in parallel |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
| future_to_chunk = { |
| executor.submit( |
| self.process_chunk, |
| chunk[0], |
| chunk[1], |
| project, |
| test_tags, |
| remaining_build_ids.copy() if remaining_build_ids else None, |
| max_builds_per_request |
| ): chunk for chunk in chunks |
| } |
| |
| for future in concurrent.futures.as_completed(future_to_chunk): |
| try: |
| chunk_builds = future.result() |
| builds.update(chunk_builds) |
| if remaining_build_ids: |
| remaining_build_ids -= set(chunk_builds.keys()) |
| except Exception as e: |
| logger.error(f"Chunk processing generated an exception: {str(e)}") |
| |
| total_duration = time.time() - total_start_time |
| logger.info( |
| f"\nBuild Info Performance:" |
| f"\n Total Duration: {total_duration:.2f}s" |
| f"\n Builds Retrieved: {len(builds)}" |
| f"\n Builds Not Found: {len(remaining_build_ids) if remaining_build_ids else 0}" |
| ) |
| |
| # Update cache with new data if not bypassing cache |
| if builds and not bypass_cache: |
| if not self.build_cache: |
| self.build_cache = BuildCache(current_time, {}) |
| self.build_cache.builds.update(builds) |
| self.build_cache.last_update = current_time |
| self._save_cache() |
| |
| return builds |
| |
| def get_test_results( |
| self, |
| project: str, |
| threshold_days: int, |
| test_tags: List[str], |
| outcomes: List[str] = None |
| ) -> List[TestResult]: |
| |
| """Fetch test results with timeline information""" |
| if outcomes is None: |
| outcomes = ["failed", "flaky"] |
| |
| logger.debug(f"Fetching test results for project {project}, last {threshold_days} days") |
| |
| end_time = datetime.now(pytz.UTC) |
| start_time = end_time - timedelta(days=threshold_days) |
| |
| all_results = {} |
| build_ids = set() |
| test_container_results = defaultdict(list) |
| |
| chunk_size = self.default_chunk_size |
| chunk_start = start_time |
| |
| while chunk_start < end_time: |
| chunk_end = min(chunk_start + chunk_size, end_time) |
| logger.debug(f"Processing chunk: {chunk_start} to {chunk_end}") |
| |
| # Use the helper method to build the query |
| query = self.build_query(project, chunk_start, chunk_end, test_tags) |
| |
| query_params = { |
| 'query': query, |
| 'testOutcomes': outcomes, |
| 'container': '*', |
| 'include': ['buildScanIds'] # Explicitly request build scan IDs |
| } |
| |
| response = requests.get( |
| f'{self.base_url}/api/tests/containers', |
| headers=self.headers, |
| params=query_params |
| ) |
| response.raise_for_status() |
| |
| for test in response.json()['content']: |
| test_name = test['name'] |
| logger.debug(f"Processing test: {test_name}") |
| |
| if test_name not in all_results: |
| outcome_data = test['outcomeDistribution'] |
| if 'notSelected' in outcome_data: |
| outcome_data['not_selected'] = outcome_data.pop('notSelected') |
| outcome = TestOutcome(**outcome_data) |
| all_results[test_name] = TestResult(test_name, outcome, chunk_start) |
| |
| # Collect build IDs by outcome |
| if 'buildScanIdsByOutcome' in test: |
| scan_ids = test['buildScanIdsByOutcome'] |
| |
| for outcome, ids in scan_ids.items(): |
| if ids: # Only process if we have IDs |
| for build_id in ids: |
| build_ids.add(build_id) |
| test_container_results[test_name].append( |
| TestContainerResult(build_id=build_id, outcome=outcome) |
| ) |
| |
| chunk_start = chunk_end |
| |
| logger.debug(f"Total unique build IDs collected: {len(build_ids)}") |
| |
| # Fetch build information using the updated get_build_info method |
| print(build_ids) |
| print(list(build_ids)) |
| |
| builds = self.get_build_info(list(build_ids), project, test_tags, threshold_days) |
| logger.debug(f"Retrieved {len(builds)} builds from API") |
| logger.debug(f"Retrieved build IDs: {sorted(builds.keys())}") |
| |
| # Update test results with timeline information |
| for test_name, result in all_results.items(): |
| logger.debug(f"\nProcessing timeline for test: {test_name}") |
| timeline = [] |
| for container_result in test_container_results[test_name]: |
| logger.debug(f"Processing container result: {container_result}") |
| if container_result.build_id in builds: |
| build_info = builds[container_result.build_id] |
| timeline.append(TestTimelineEntry( |
| build_id=container_result.build_id, |
| timestamp=build_info.timestamp, |
| outcome=container_result.outcome |
| )) |
| else: |
| logger.warning(f"Build ID {container_result.build_id} not found in builds response") |
| |
| # Sort timeline by timestamp |
| result.timeline = sorted(timeline, key=lambda x: x.timestamp) |
| logger.debug(f"Final timeline entries for {test_name}: {len(result.timeline)}") |
| |
| # Print build details for debugging |
| logger.debug("Timeline entries:") |
| for entry in timeline: |
| logger.debug(f"Build ID: {entry.build_id}, Timestamp: {entry.timestamp}, Outcome: {entry.outcome}") |
| |
| # Calculate recent failure rate |
| recent_cutoff = datetime.now(pytz.UTC) - timedelta(days=30) |
| recent_runs = [t for t in timeline if t.timestamp >= recent_cutoff] |
| if recent_runs: |
| recent_failures = sum(1 for t in recent_runs if t.outcome in ('failed', 'flaky')) |
| result.recent_failure_rate = recent_failures / len(recent_runs) |
| |
| return list(all_results.values()) |
| |
| def get_defective_tests(self, results: List[TestResult]) -> Dict[str, TestResult]: |
| """ |
| Analyze test results to find defective tests (failed or flaky) |
| """ |
| defective_tests = {} |
| |
| for result in results: |
| if result.outcome_distribution.failed > 0 or result.outcome_distribution.flaky > 0: |
| defective_tests[result.name] = result |
| |
| return defective_tests |
| |
| def get_long_quarantined_tests(self, results: List[TestResult], quarantine_threshold_days: int = 60) -> Dict[str, TestResult]: |
| """ |
| Find tests that have been quarantined longer than the threshold. |
| These are candidates for removal or rewriting. |
| |
| Args: |
| results: List of test results |
| quarantine_threshold_days: Number of days after which a quarantined test should be considered for removal/rewrite |
| """ |
| long_quarantined = {} |
| current_time = datetime.now(pytz.UTC) |
| |
| for result in results: |
| days_quarantined = (current_time - result.first_seen).days |
| if days_quarantined >= quarantine_threshold_days: |
| long_quarantined[result.name] = (result, days_quarantined) |
| |
| return long_quarantined |
| |
| def get_problematic_quarantined_tests( |
| self, |
| results: List[TestResult], |
| quarantine_threshold_days: int = 60, |
| min_failure_rate: float = 0.3, |
| recent_failure_threshold: float = 0.5 |
| ) -> Dict[str, Dict]: |
| """Enhanced version that includes test case details""" |
| problematic_tests = {} |
| current_time = datetime.now(pytz.UTC) |
| chunk_start = current_time - timedelta(days=7) # Last 7 days for test cases |
| |
| for result in results: |
| days_quarantined = (current_time - result.first_seen).days |
| if days_quarantined >= quarantine_threshold_days: |
| total_runs = result.outcome_distribution.total |
| if total_runs > 0: |
| problem_runs = result.outcome_distribution.failed + result.outcome_distribution.flaky |
| failure_rate = problem_runs / total_runs |
| |
| if failure_rate >= min_failure_rate or result.recent_failure_rate >= recent_failure_threshold: |
| # Get detailed test case information |
| try: |
| test_cases = self.get_test_case_details( |
| result.name, |
| "kafka", |
| chunk_start, |
| current_time, |
| test_tags=["+trunk", "+flaky"] |
| ) |
| |
| problematic_tests[result.name] = { |
| 'container_result': result, |
| 'days_quarantined': days_quarantined, |
| 'failure_rate': failure_rate, |
| 'recent_failure_rate': result.recent_failure_rate, |
| 'test_cases': test_cases |
| } |
| except Exception as e: |
| logger.error(f"Error getting test case details for {result.name}: {str(e)}") |
| |
| return problematic_tests |
| |
| def get_test_case_details( |
| self, |
| container_name: str, |
| project: str, |
| chunk_start: datetime, |
| chunk_end: datetime, |
| test_tags: List[str] |
| ) -> List[TestCaseResult]: |
| """ |
| Fetch detailed test case results for a specific container. |
| |
| Args: |
| container_name: Name of the test container |
| project: The project name |
| chunk_start: Start time for the query |
| chunk_end: End time for the query |
| test_tags: List of tags to query |
| """ |
| # Use the helper method to build the query, similar to get_test_results |
| query = self.build_query(project, chunk_start, chunk_end, test_tags) |
| |
| query_params = { |
| 'query': query, |
| 'testOutcomes': ['failed', 'flaky'], |
| 'container': container_name, |
| 'include': ['buildScanIds'], # Explicitly request build scan IDs |
| 'limit': 1000 |
| } |
| |
| try: |
| response = requests.get( |
| f'{self.base_url}/api/tests/cases', |
| headers=self.headers, |
| params=query_params |
| ) |
| response.raise_for_status() |
| |
| test_cases = [] |
| content = response.json().get('content', []) |
| |
| # Collect all build IDs first |
| build_ids = set() |
| for test in content: |
| if 'buildScanIdsByOutcome' in test: |
| for outcome_type, ids in test['buildScanIdsByOutcome'].items(): |
| build_ids.update(ids) |
| |
| # Get build info for all build IDs |
| builds = self.get_build_info(list(build_ids), project, test_tags, 7) # 7 days for test cases |
| |
| for test in content: |
| outcome_data = test['outcomeDistribution'] |
| if 'notSelected' in outcome_data: |
| outcome_data['not_selected'] = outcome_data.pop('notSelected') |
| outcome = TestOutcome(**outcome_data) |
| |
| test_case = TestCaseResult( |
| name=test['name'], |
| outcome_distribution=outcome, |
| first_seen=chunk_start, |
| container_name=container_name |
| ) |
| |
| # Add build information with proper timestamps |
| if 'buildScanIdsByOutcome' in test: |
| for outcome_type, build_ids in test['buildScanIdsByOutcome'].items(): |
| for build_id in build_ids: |
| if build_id in builds: |
| build_info = builds[build_id] |
| test_case.timeline.append( |
| TestTimelineEntry( |
| build_id=build_id, |
| timestamp=build_info.timestamp, |
| outcome=outcome_type |
| ) |
| ) |
| else: |
| logger.warning(f"Build ID {build_id} not found for test case {test['name']}") |
| |
| # Sort timeline by timestamp |
| test_case.timeline.sort(key=lambda x: x.timestamp) |
| test_cases.append(test_case) |
| |
| return test_cases |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Error fetching test case details for {container_name}: {str(e)}") |
| raise |
| |
| def get_flaky_test_regressions(self, project: str, results: List[TestResult], |
| recent_days: int = 7, min_flaky_rate: float = 0.2) -> Dict[str, Dict]: |
| """ |
| Identify tests that have recently started showing flaky behavior. |
| |
| Args: |
| project: The project name |
| results: List of test results |
| recent_days: Number of days to consider for recent behavior |
| min_flaky_rate: Minimum flaky rate to consider a test as problematic |
| """ |
| flaky_regressions = {} |
| current_time = datetime.now(pytz.UTC) |
| recent_cutoff = current_time - timedelta(days=recent_days) |
| |
| for result in results: |
| # Skip tests with no timeline data |
| if not result.timeline: |
| continue |
| |
| # Split timeline into recent and historical periods |
| recent_entries = [t for t in result.timeline if t.timestamp >= recent_cutoff] |
| historical_entries = [t for t in result.timeline if t.timestamp < recent_cutoff] |
| |
| if not recent_entries or not historical_entries: |
| continue |
| |
| # Calculate flaky rates |
| recent_flaky = sum(1 for t in recent_entries if t.outcome == 'flaky') |
| recent_total = len(recent_entries) |
| recent_flaky_rate = recent_flaky / recent_total if recent_total > 0 else 0 |
| |
| historical_flaky = sum(1 for t in historical_entries if t.outcome == 'flaky') |
| historical_total = len(historical_entries) |
| historical_flaky_rate = historical_flaky / historical_total if historical_total > 0 else 0 |
| |
| # Check if there's a significant increase in flakiness |
| if recent_flaky_rate >= min_flaky_rate and recent_flaky_rate > historical_flaky_rate * 1.5: |
| flaky_regressions[result.name] = { |
| 'result': result, |
| 'recent_flaky_rate': recent_flaky_rate, |
| 'historical_flaky_rate': historical_flaky_rate, |
| 'recent_executions': recent_entries, |
| 'historical_executions': historical_entries |
| } |
| |
| return flaky_regressions |
| |
| def get_cleared_tests(self, project: str, results: List[TestResult], |
| success_threshold: float = 0.7, min_executions: int = 5) -> Dict[str, Dict]: |
| """ |
| Identify quarantined tests that are consistently passing and could be cleared. |
| |
| Args: |
| project: The project name |
| results: List of test results |
| success_threshold: Required percentage of successful builds to be considered cleared |
| min_executions: Minimum number of executions required to make a determination |
| """ |
| cleared_tests = {} |
| current_time = datetime.now(pytz.UTC) |
| chunk_start = current_time - timedelta(days=7) # Last 7 days for test cases |
| |
| for result in results: |
| # Only consider tests with sufficient recent executions |
| recent_executions = result.timeline |
| if len(recent_executions) < min_executions: |
| continue |
| |
| # Calculate success rate at class level |
| successful_runs = sum(1 for t in recent_executions |
| if t.outcome == 'passed') |
| success_rate = successful_runs / len(recent_executions) |
| |
| # Check if the test meets clearing criteria at class level |
| if success_rate >= success_threshold: |
| # Verify no recent failures or flaky behavior |
| has_recent_issues = any(t.outcome in ['failed', 'flaky'] |
| for t in recent_executions[-min_executions:]) |
| |
| if not has_recent_issues: |
| try: |
| # Get test case details |
| test_cases = self.get_test_case_details( |
| result.name, |
| project, |
| chunk_start, |
| current_time, |
| test_tags=["+trunk", "+flaky"] |
| ) |
| |
| # Only include if all test cases are also passing consistently |
| all_cases_passing = True |
| passing_test_cases = [] |
| |
| for test_case in test_cases: |
| case_total = test_case.outcome_distribution.total |
| if case_total >= min_executions: |
| case_success_rate = test_case.outcome_distribution.passed / case_total |
| |
| # Check recent executions for the test case |
| recent_case_issues = any(t.outcome in ['failed', 'flaky'] |
| for t in test_case.timeline[-min_executions:]) |
| |
| if case_success_rate >= success_threshold and not recent_case_issues: |
| passing_test_cases.append({ |
| 'name': test_case.name, |
| 'success_rate': case_success_rate, |
| 'total_executions': case_total, |
| 'recent_executions': sorted(test_case.timeline, |
| key=lambda x: x.timestamp)[-min_executions:] |
| }) |
| else: |
| all_cases_passing = False |
| break |
| |
| if all_cases_passing and passing_test_cases: |
| cleared_tests[result.name] = { |
| 'result': result, |
| 'success_rate': success_rate, |
| 'total_executions': len(recent_executions), |
| 'successful_runs': successful_runs, |
| 'recent_executions': recent_executions[-min_executions:], |
| 'test_cases': passing_test_cases |
| } |
| |
| except Exception as e: |
| logger.error(f"Error getting test case details for {result.name}: {str(e)}") |
| |
| return cleared_tests |
| |
| def update_cache(self, builds: Dict[str, BuildInfo]): |
| """ |
| Update the build cache with new build information. |
| |
| Args: |
| builds: Dictionary of build IDs to BuildInfo objects |
| """ |
| current_time = datetime.now(pytz.UTC) |
| |
| # Initialize cache if it doesn't exist |
| if not self.build_cache: |
| self.build_cache = BuildCache(current_time, {}) |
| |
| # Update builds and last update time |
| self.build_cache.builds.update(builds) |
| self.build_cache.last_update = current_time |
| |
| # Save to all cache providers |
| self._save_cache() |
| |
| logger.info(f"Updated cache with {len(builds)} builds") |
| |
| def get_persistent_failing_tests(self, results: List[TestResult], |
| min_failure_rate: float = 0.2, |
| min_executions: int = 5) -> Dict[str, Dict]: |
| """ |
| Identify tests that have been consistently failing/flaky over time. |
| Groups by test class and includes individual test cases. |
| """ |
| persistent_failures = {} |
| current_time = datetime.now(pytz.UTC) |
| chunk_start = current_time - timedelta(days=7) # Last 7 days for test cases |
| |
| # Group results by class |
| class_groups = {} |
| for result in results: |
| class_name = result.name.split('#')[0] # Get class name |
| if class_name not in class_groups: |
| class_groups[class_name] = [] |
| class_groups[class_name].append(result) |
| |
| # Analyze each class and its test cases |
| for class_name, class_results in class_groups.items(): |
| class_total = sum(r.outcome_distribution.total for r in class_results) |
| class_problems = sum(r.outcome_distribution.failed + r.outcome_distribution.flaky |
| for r in class_results) |
| |
| if class_total < min_executions: |
| continue |
| |
| class_failure_rate = class_problems / class_total if class_total > 0 else 0 |
| |
| # Only include if class has significant failures |
| if class_failure_rate >= min_failure_rate: |
| try: |
| # Get detailed test case information using the same method as other reports |
| test_cases = self.get_test_case_details( |
| class_name, |
| "kafka", |
| chunk_start, |
| current_time, |
| test_tags=["+trunk", "-flaky"] |
| ) |
| |
| failing_test_cases = {} |
| for test_case in test_cases: |
| total_runs = test_case.outcome_distribution.total |
| if total_runs >= min_executions: |
| problem_runs = (test_case.outcome_distribution.failed + |
| test_case.outcome_distribution.flaky) |
| failure_rate = problem_runs / total_runs if total_runs > 0 else 0 |
| |
| if failure_rate >= min_failure_rate: |
| # Extract just the method name |
| method_name = test_case.name.split('.')[-1] |
| failing_test_cases[method_name] = { |
| 'result': test_case, |
| 'failure_rate': failure_rate, |
| 'total_executions': total_runs, |
| 'failed_executions': problem_runs, |
| 'timeline': sorted(test_case.timeline, key=lambda x: x.timestamp) |
| } |
| |
| if failing_test_cases: # Only include classes that have problematic test cases |
| persistent_failures[class_name] = { |
| 'failure_rate': class_failure_rate, |
| 'total_executions': class_total, |
| 'failed_executions': class_problems, |
| 'test_cases': failing_test_cases |
| } |
| |
| except Exception as e: |
| logger.error(f"Error getting test case details for {class_name}: {str(e)}") |
| |
| return persistent_failures |
| |
| def get_develocity_class_link(class_name: str, threshold_days: int) -> str: |
| """ |
| Generate Develocity link for a test class |
| |
| Args: |
| class_name: Name of the test class |
| threshold_days: Number of days to look back in search |
| """ |
| base_url = "https://develocity.apache.org/scans/tests" |
| params = { |
| "search.rootProjectNames": "kafka", |
| "search.tags": "github,trunk", |
| "search.timeZoneId": "UTC", |
| "search.relativeStartTime": f"P{threshold_days}D", |
| "tests.container": class_name, |
| "search.tasks": "test" |
| } |
| |
| return f"{base_url}?{'&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())}" |
| |
| def get_develocity_method_link(class_name: str, method_name: str, threshold_days: int) -> str: |
| """ |
| Generate Develocity link for a test method |
| |
| Args: |
| class_name: Name of the test class |
| method_name: Name of the test method |
| threshold_days: Number of days to look back in search |
| """ |
| base_url = "https://develocity.apache.org/scans/tests" |
| |
| # Extract just the method name without the class prefix |
| if '.' in method_name: |
| method_name = method_name.split('.')[-1] |
| |
| params = { |
| "search.rootProjectNames": "kafka", |
| "search.tags": "github,trunk", |
| "search.timeZoneId": "UTC", |
| "search.relativeStartTime": f"P{threshold_days}D", |
| "tests.container": class_name, |
| "tests.test": method_name, |
| "search.tasks": "test" |
| } |
| |
| return f"{base_url}?{'&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())}" |
| |
| def print_most_problematic_tests(problematic_tests: Dict[str, Dict], threshold_days: int): |
| """Print a summary of the most problematic tests""" |
| print("\n## Most Problematic Tests") |
| if not problematic_tests: |
| print("No high-priority problematic tests found.") |
| return |
| |
| print(f"Found {len(problematic_tests)} tests that have been quarantined for {threshold_days} days and are still failing frequently.") |
| |
| # Print table with class and method information |
| print("\n<table>") |
| print("<tr><td>Class</td><td>Test Case</td><td>Failure Rate</td><td>Build Scans</td><td>Link</td></tr>") |
| |
| for test_name, details in sorted(problematic_tests.items(), |
| key=lambda x: x[1]['failure_rate'], |
| reverse=True): |
| class_link = get_develocity_class_link(test_name, threshold_days) |
| print(f"<tr><td colspan=\"4\">{test_name}</td><td><a href=\"{class_link}\">↗️</a></td></tr>") |
| |
| for test_case in sorted(details['test_cases'], |
| key=lambda x: (x.outcome_distribution.failed + x.outcome_distribution.flaky) / x.outcome_distribution.total |
| if x.outcome_distribution.total > 0 else 0, |
| reverse=True): |
| method_name = test_case.name.split('.')[-1] |
| if method_name != 'N/A': |
| method_link = get_develocity_method_link(test_name, test_case.name, threshold_days) |
| total_runs = test_case.outcome_distribution.total |
| failure_rate = (test_case.outcome_distribution.failed + test_case.outcome_distribution.flaky) / total_runs if total_runs > 0 else 0 |
| print(f"<tr><td></td><td>{method_name}</td>" |
| f"<td>{failure_rate:.2%}</td><td>{total_runs}</td>" |
| f"<td><a href=\"{method_link}\">↗️</a></td></tr>") |
| print("</table>") |
| |
| # Print detailed execution history |
| print("\n<details>") |
| print("<summary>Detailed Execution History</summary>\n") |
| |
| for test_name, details in sorted(problematic_tests.items(), |
| key=lambda x: x[1]['failure_rate'], |
| reverse=True): |
| print(f"\n### {test_name}") |
| print(f"* Days Quarantined: {details['days_quarantined']}") |
| print(f"* Recent Failure Rate: {details['recent_failure_rate']:.2%}") |
| print(f"* Total Runs: {details['container_result'].outcome_distribution.total}") |
| print(f"* Build Outcomes: Passed: {details['container_result'].outcome_distribution.passed} | " |
| f"Failed: {details['container_result'].outcome_distribution.failed} | " |
| f"Flaky: {details['container_result'].outcome_distribution.flaky}") |
| |
| for test_method in sorted(details['test_cases'], |
| key=lambda x: (x.outcome_distribution.failed + x.outcome_distribution.flaky) / x.outcome_distribution.total |
| if x.outcome_distribution.total > 0 else 0, |
| reverse=True): |
| if test_method.timeline: |
| print(f"\n#### {method_name}") |
| print("Recent Executions:") |
| print("```") |
| print("Date/Time (UTC) Outcome Build ID") |
| print("-" * 44) |
| for entry in sorted(test_method.timeline, key=lambda x: x.timestamp, reverse=True)[:5]: |
| date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') |
| print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") |
| print("```") |
| |
| print("</details>") |
| |
| def print_flaky_regressions(flaky_regressions: Dict[str, Dict], threshold_days: int): |
| """Print tests that have recently started showing flaky behavior""" |
| print("\n## Flaky Test Regressions") |
| if not flaky_regressions: |
| print("No flaky test regressions found.") |
| return |
| |
| print(f"Found {len(flaky_regressions)} tests that have started showing increased flaky behavior recently.") |
| |
| # Print table with test details |
| print("\n<table>") |
| print("<tr><td>Test Class</td><td>Recent Flaky Rate</td><td>Historical Rate</td><td>Recent Executions</td><td>Link</td></tr>") |
| |
| for test_name, details in flaky_regressions.items(): |
| class_link = get_develocity_class_link(test_name, threshold_days) |
| print(f"<tr><td colspan=\"4\">{test_name}</td><td><a href=\"{class_link}\">↗️</a></td></tr>") |
| print(f"<tr><td></td><td>{details['recent_flaky_rate']:.2%}</td>" |
| f"<td>{details['historical_flaky_rate']:.2%}</td>" |
| f"<td>{len(details['recent_executions'])}</td><td></td></tr>") |
| |
| # Add recent execution details in sub-rows |
| print("<tr><td colspan=\"5\">Recent Executions:</td></tr>") |
| for entry in sorted(details['recent_executions'], key=lambda x: x.timestamp, reverse=True)[:5]: |
| date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') |
| print(f"<tr><td></td><td colspan=\"4\">{date_str} - {entry.outcome}</td></tr>") |
| print("</table>") |
| |
| # Print detailed history |
| print("\n<details>") |
| print("<summary>Detailed Execution History</summary>\n") |
| |
| for test_name, details in sorted(flaky_regressions.items(), |
| key=lambda x: x[1]['recent_flaky_rate'], |
| reverse=True): |
| print(f"\n### {test_name}") |
| print(f"* Recent Flaky Rate: {details['recent_flaky_rate']:.2%}") |
| print(f"* Historical Flaky Rate: {details['historical_flaky_rate']:.2%}") |
| print("\nRecent Executions:") |
| print("```") |
| print("Date/Time (UTC) Outcome Build ID") |
| print("-" * 44) |
| for entry in sorted(details['recent_executions'], key=lambda x: x.timestamp, reverse=True)[:5]: |
| date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') |
| print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") |
| print("```") |
| |
| print("</details>") |
| |
| def print_persistent_failing_tests(persistent_failures: Dict[str, Dict], threshold_days: int): |
| """Print tests that have been consistently failing over time""" |
| print("\n## Persistently Failing/Flaky Tests") |
| if not persistent_failures: |
| print("No persistently failing tests found.") |
| return |
| |
| print(f"Found {len(persistent_failures)} tests that have been consistently failing or flaky.") |
| |
| # Print table with test details |
| print("\n<table>") |
| print("<tr><td>Test Class</td><td>Test Case</td><td>Failure Rate</td><td>Total Runs</td><td>Failed/Flaky</td><td>Link</td></tr>") |
| |
| for class_name, class_details in sorted(persistent_failures.items(), |
| key=lambda x: x[1]['failure_rate'], |
| reverse=True): |
| class_link = get_develocity_class_link(class_name, threshold_days) |
| |
| # Print class row |
| print(f"<tr><td colspan=\"5\">{class_name}</td>" |
| f"<td><a href=\"{class_link}\">↗️</a></td></tr>") |
| |
| # Print test case rows |
| for test_name, test_details in sorted(class_details['test_cases'].items(), |
| key=lambda x: x[1]['failure_rate'], |
| reverse=True): |
| test_link = get_develocity_method_link(class_name, test_name, threshold_days) |
| print(f"<tr><td></td>" |
| f"<td>{test_name}</td>" |
| f"<td>{test_details['failure_rate']:.2%}</td>" |
| f"<td>{test_details['total_executions']}</td>" |
| f"<td>{test_details['failed_executions']}</td>" |
| f"<td><a href=\"{test_link}\">↗️</a></td></tr>") |
| print("</table>") |
| |
| # Print detailed history |
| print("\n<details>") |
| print("<summary>Detailed Execution History</summary>\n") |
| |
| for class_name, class_details in sorted(persistent_failures.items(), |
| key=lambda x: x[1]['failure_rate'], |
| reverse=True): |
| print(f"\n### {class_name}") |
| print(f"* Overall Failure Rate: {class_details['failure_rate']:.2%}") |
| print(f"* Total Executions: {class_details['total_executions']}") |
| print(f"* Failed/Flaky Executions: {class_details['failed_executions']}") |
| |
| for test_name, test_details in sorted(class_details['test_cases'].items(), |
| key=lambda x: x[1]['failure_rate'], |
| reverse=True): |
| print("\nRecent Executions:") |
| print("```") |
| print("Date/Time (UTC) Outcome Build ID") |
| print("-" * 44) |
| for entry in sorted(test_details['timeline'], key=lambda x: x.timestamp, reverse=True)[:5]: |
| date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') |
| print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") |
| print("```") |
| |
| print("</details>") |
| |
| def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): |
| """Print tests that are ready to be unquarantined""" |
| print("\n## Cleared Tests (Ready for Unquarantine)") |
| if not cleared_tests: |
| print("No tests ready to be cleared from quarantine.") |
| return |
| |
| # Calculate total number of test methods |
| total_methods = sum(len(details['test_cases']) for details in cleared_tests.values()) |
| |
| print(f"Found {len(cleared_tests)} test classes with {total_methods} test methods that have been consistently passing. " |
| f"These tests could be candidates for removing quarantine annotations at either class or method level.") |
| |
| # Print table with class and method information |
| print("\n<table>") |
| print("<tr><td>Test Class</td><td>Test Method</td><td>Success Rate</td><td>Total Runs</td><td>Recent Status</td><td>Link</td></tr>") |
| |
| for test_name, details in sorted(cleared_tests.items(), |
| key=lambda x: x[1]['success_rate'], |
| reverse=True): |
| class_link = get_develocity_class_link(test_name, threshold_days) |
| print(f"<tr><td colspan=\"5\">{test_name}</td><td><a href=\"{class_link}\">↗️</a></td></tr>") |
| print(f"<tr><td></td><td>Class Overall</td>" |
| f"<td>{details['success_rate']:.2%}</td>" |
| f"<td>{details['total_executions']}</td>" |
| f"<td>{details['successful_runs']} passed</td><td></td></tr>") |
| |
| for test_case in details['test_cases']: |
| method_name = test_case['name'].split('.')[-1] |
| method_link = get_develocity_method_link(test_name, test_case['name'], threshold_days) |
| recent_status = "N/A" |
| if test_case['recent_executions']: |
| recent_status = test_case['recent_executions'][-1].outcome |
| |
| print(f"<tr><td></td><td>{method_name}</td>" |
| f"<td>{test_case['success_rate']:.2%}</td>" |
| f"<td>{test_case['total_executions']}</td>" |
| f"<td>{recent_status}</td>" |
| f"<td><a href=\"{method_link}\">↗️</a></td></tr>") |
| print("<tr><td colspan=\"6\"> </td></tr>") |
| print("</table>") |
| |
| # Print detailed history |
| print("\n<details>") |
| print("<summary>Detailed Test Method History</summary>\n") |
| |
| for test_name, details in sorted(cleared_tests.items(), |
| key=lambda x: x[1]['success_rate'], |
| reverse=True): |
| print(f"\n### {test_name}") |
| print(f"* Overall Success Rate: {details['success_rate']:.2%}") |
| print(f"* Total Executions: {details['total_executions']}") |
| print(f"* Consecutive Successful Runs: {details['successful_runs']}") |
| |
| for test_case in details['test_cases']: |
| method_name = test_case['name'].split('.')[-1] |
| print(f"\n#### {method_name}") |
| print(f"* Success Rate: {test_case['success_rate']:.2%}") |
| print(f"* Total Runs: {test_case['total_executions']}") |
| print("\nRecent Executions:") |
| print("```") |
| print("Date/Time (UTC) Outcome Build ID") |
| print("-" * 44) |
| for entry in sorted(test_case['recent_executions'], key=lambda x: x.timestamp, reverse=True)[:5]: |
| date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') |
| print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") |
| print("```") |
| |
| print("</details>") |
| |
| def main(): |
| token = None |
| if os.environ.get("DEVELOCITY_ACCESS_TOKEN"): |
| token = os.environ.get("DEVELOCITY_ACCESS_TOKEN") |
| else: |
| print("No auth token was specified. You must set DEVELOCITY_ACCESS_TOKEN to your personal access token.") |
| exit(1) |
| |
| # Configuration |
| BASE_URL = "https://develocity.apache.org" |
| PROJECT = "kafka" |
| QUARANTINE_THRESHOLD_DAYS = 7 |
| MIN_FAILURE_RATE = 0.1 |
| RECENT_FAILURE_THRESHOLD = 0.5 |
| SUCCESS_THRESHOLD = 0.7 # For cleared tests |
| MIN_FLAKY_RATE = 0.2 # For flaky regressions |
| |
| analyzer = TestAnalyzer(BASE_URL, token) |
| |
| try: |
| quarantined_builds = analyzer.get_build_info([], PROJECT, "quarantinedTest", 7, bypass_cache=True, fetch_all=True) |
| regular_builds = analyzer.get_build_info([], PROJECT, "test", 7, bypass_cache=True, fetch_all=True) |
| |
| analyzer.update_cache(quarantined_builds) |
| analyzer.update_cache(regular_builds) |
| |
| # Get test results |
| quarantined_results = analyzer.get_test_results( |
| PROJECT, |
| threshold_days=QUARANTINE_THRESHOLD_DAYS, |
| test_tags=["+trunk", "+flaky", "-new"] |
| ) |
| |
| regular_results = analyzer.get_test_results( |
| PROJECT, |
| threshold_days=7, # Last 7 days for regular tests |
| test_tags=["+trunk", "-flaky", "-new"] |
| ) |
| |
| # Generate reports |
| problematic_tests = analyzer.get_problematic_quarantined_tests( |
| quarantined_results, |
| QUARANTINE_THRESHOLD_DAYS, |
| MIN_FAILURE_RATE, |
| RECENT_FAILURE_THRESHOLD |
| ) |
| |
| flaky_regressions = analyzer.get_flaky_test_regressions( |
| PROJECT, |
| regular_results, |
| recent_days=7, |
| min_flaky_rate=MIN_FLAKY_RATE |
| ) |
| |
| cleared_tests = analyzer.get_cleared_tests( |
| PROJECT, |
| quarantined_results, |
| success_threshold=SUCCESS_THRESHOLD |
| ) |
| |
| # Get persistent failing tests (add after getting regular_results) |
| persistent_failures = analyzer.get_persistent_failing_tests( |
| regular_results, |
| min_failure_rate=0.2, # 20% failure rate threshold |
| min_executions=5 |
| ) |
| |
| # Print report header |
| print(f"\n# Flaky Test Report for {datetime.now(pytz.UTC).strftime('%Y-%m-%d')}") |
| print(f"This report was run on {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S')} UTC") |
| |
| # Print each section |
| print_most_problematic_tests(problematic_tests, QUARANTINE_THRESHOLD_DAYS) |
| print_flaky_regressions(flaky_regressions, QUARANTINE_THRESHOLD_DAYS) |
| print_persistent_failing_tests(persistent_failures, QUARANTINE_THRESHOLD_DAYS) |
| print_cleared_tests(cleared_tests, QUARANTINE_THRESHOLD_DAYS) |
| |
| except Exception as e: |
| logger.exception("Error occurred during report generation") |
| print(f"Error occurred: {str(e)}") |
| |
| if __name__ == "__main__": |
| # Configure logging |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler("flaky_test_report.log") |
| ] |
| ) |
| main() |