blob: f6dc0a035c23439a36c93ac362b9088e2fe27e09 [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# /// script
# requires-python = ">=3.10"
# ///
"""
Determine whether to send a Slack notification based on previous state.
Downloads previous state from GitHub Actions artifacts, compares with current
failures, and outputs the appropriate action to take.
Outputs (written to GITHUB_OUTPUT):
action - One of: notify_new, notify_reminder, notify_recovery, skip
current-failures - JSON list of current failure names
previous-failures - JSON list of previous failure names
Environment variables (required):
ARTIFACT_NAME - Name of the artifact storing notification state
GITHUB_REPOSITORY - Owner/repo (e.g. apache/airflow)
Environment variables (optional):
CURRENT_FAILURES - Newline-separated list of current failures (empty if none)
GITHUB_OUTPUT - Path to GitHub Actions output file
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
REMINDER_INTERVAL_HOURS = 24
STATE_DIR = Path("./slack-state")
PREV_STATE_DIR = Path("./prev-slack-state")
def download_previous_state(artifact_name: str, repo: str) -> dict | None:
"""Download previous notification state artifact from GitHub Actions."""
result = subprocess.run(
[
"gh",
"api",
f"repos/{repo}/actions/artifacts",
"-f",
f"name={artifact_name}",
"-f",
"per_page=1",
"--jq",
".artifacts[0]",
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
print(f"Could not query artifacts API: {result.stderr}", file=sys.stderr)
return None
output = result.stdout.strip()
if not output or output == "null":
print("No previous state artifact found.")
return None
try:
artifact = json.loads(output)
except json.JSONDecodeError:
print(f"Invalid JSON from artifacts API: {output}", file=sys.stderr)
return None
if artifact.get("expired", False):
print("Previous state artifact has expired.")
return None
run_id = artifact.get("workflow_run", {}).get("id")
if not run_id:
print("No workflow run ID in artifact metadata.")
return None
PREV_STATE_DIR.mkdir(parents=True, exist_ok=True)
dl_result = subprocess.run(
[
"gh",
"run",
"download",
str(run_id),
"--name",
artifact_name,
"--dir",
str(PREV_STATE_DIR),
"--repo",
repo,
],
check=False,
capture_output=True,
text=True,
)
if dl_result.returncode != 0:
print(f"Could not download previous state: {dl_result.stderr}", file=sys.stderr)
return None
state_file = PREV_STATE_DIR / "state.json"
if not state_file.exists():
print("Downloaded artifact does not contain state.json.")
return None
try:
return json.loads(state_file.read_text())
except json.JSONDecodeError:
print("Invalid JSON in state.json.", file=sys.stderr)
return None
def determine_action(current_failures: list[str], prev_state: dict | None) -> str:
"""Determine what notification action to take.
Returns one of: notify_new, notify_reminder, notify_recovery, skip.
"""
prev_failures = sorted(prev_state.get("failures", [])) if prev_state else []
prev_notified = prev_state.get("last_notified") if prev_state else None
now = datetime.now(timezone.utc)
if current_failures:
if not prev_failures:
return "notify_new"
if current_failures != prev_failures:
return "notify_new"
# Same failures as before — check if reminder is due
if prev_notified:
prev_time = datetime.fromisoformat(prev_notified)
hours_since = (now - prev_time).total_seconds() / 3600
if hours_since >= REMINDER_INTERVAL_HOURS:
return "notify_reminder"
else:
return "notify_new"
elif prev_failures:
# Was failing, now all clear
return "notify_recovery"
return "skip"
def save_state(current_failures: list[str], action: str, prev_notified: str | None) -> None:
"""Save current state to file for upload as artifact."""
now = datetime.now(timezone.utc)
STATE_DIR.mkdir(parents=True, exist_ok=True)
new_state = {
"failures": current_failures,
"last_notified": (now.isoformat() if action != "skip" else (prev_notified or now.isoformat())),
}
(STATE_DIR / "state.json").write_text(json.dumps(new_state, indent=2))
print(f"Saved state to {STATE_DIR / 'state.json'}: {new_state}")
def main() -> None:
artifact_name = os.environ.get("ARTIFACT_NAME")
if not artifact_name:
print("ERROR: ARTIFACT_NAME environment variable is required.", file=sys.stderr)
sys.exit(1)
repo = os.environ.get("GITHUB_REPOSITORY", "apache/airflow")
current_failures_str = os.environ.get("CURRENT_FAILURES", "")
current_failures = sorted([f.strip() for f in current_failures_str.strip().splitlines() if f.strip()])
# Download previous state
print(f"Looking up previous state for artifact: {artifact_name}")
prev_state = download_previous_state(artifact_name, repo)
print(f"Previous state: {prev_state}")
# Determine action
action = determine_action(current_failures, prev_state)
prev_failures = sorted(prev_state.get("failures", [])) if prev_state else []
print(f"Action: {action}")
print(f"Current failures: {current_failures}")
print(f"Previous failures: {prev_failures}")
# Save new state
prev_notified = prev_state.get("last_notified") if prev_state else None
save_state(current_failures, action, prev_notified)
# Output for GitHub Actions
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a") as f:
f.write(f"action={action}\n")
f.write(f"current-failures={json.dumps(current_failures)}\n")
f.write(f"previous-failures={json.dumps(prev_failures)}\n")
if __name__ == "__main__":
main()