blob: f10c79655faaf47578f076ed6f34feb3025d0283 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from unittest.mock import patch
import pytest
from airflow_breeze.commands.release_command import find_latest_release_candidate
class TestFindLatestReleaseCandidate:
"""Test the find_latest_release_candidate function."""
def test_find_latest_rc_single_candidate(self, tmp_path):
"""Test finding release candidate when only one exists."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
# Create a single RC directory
(svn_dev_repo / "3.0.5rc1").mkdir()
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result == "3.0.5rc1"
def test_find_latest_rc_multiple_candidates(self, tmp_path):
"""Test finding latest release candidate when multiple exist."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
# Create multiple RC directories
(svn_dev_repo / "3.0.5rc1").mkdir()
(svn_dev_repo / "3.0.5rc2").mkdir()
(svn_dev_repo / "3.0.5rc3").mkdir()
(svn_dev_repo / "3.0.5rc10").mkdir() # Test that rc10 > rc3
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result == "3.0.5rc10"
def test_find_latest_rc_ignores_other_versions(self, tmp_path):
"""Test that function ignores RCs for other versions."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
# Create RCs for different versions
(svn_dev_repo / "3.0.4rc1").mkdir()
(svn_dev_repo / "3.0.5rc1").mkdir()
(svn_dev_repo / "3.0.5rc2").mkdir()
(svn_dev_repo / "3.0.6rc1").mkdir()
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result == "3.0.5rc2"
def test_find_latest_rc_ignores_non_rc_directories(self, tmp_path):
"""Test that function ignores directories that don't match RC pattern."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
# Create RC directory and non-RC directories
(svn_dev_repo / "3.0.5rc1").mkdir()
(svn_dev_repo / "3.0.5").mkdir() # Final release directory
(svn_dev_repo / "some-other-dir").mkdir()
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result == "3.0.5rc1"
def test_find_latest_rc_no_match(self, tmp_path):
"""Test that function returns None when no matching RC found."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
# Create RCs for different version
(svn_dev_repo / "3.0.4rc1").mkdir()
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result is None
def test_find_latest_rc_directory_not_exists(self, tmp_path):
"""Test that function returns None when directory doesn't exist."""
svn_dev_repo = tmp_path / "dev" / "airflow"
# Don't create the directory
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result is None
def test_find_latest_rc_empty_directory(self, tmp_path):
"""Test that function returns None when directory is empty."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result is None
def test_find_latest_rc_task_sdk_component(self, tmp_path):
"""Test finding release candidate for task-sdk component."""
svn_dev_repo = tmp_path / "dev" / "airflow"
task_sdk_dir = svn_dev_repo / "task-sdk"
task_sdk_dir.mkdir(parents=True)
# Create multiple Task SDK RC directories
(task_sdk_dir / "1.0.5rc1").mkdir()
(task_sdk_dir / "1.0.5rc2").mkdir()
(task_sdk_dir / "1.0.5rc3").mkdir()
result = find_latest_release_candidate("1.0.5", str(svn_dev_repo), component="task-sdk")
assert result == "1.0.5rc3"
def test_find_latest_rc_task_sdk_ignores_airflow_rcs(self, tmp_path):
"""Test that task-sdk component ignores airflow RCs."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
task_sdk_dir = svn_dev_repo / "task-sdk"
task_sdk_dir.mkdir()
# Create airflow RC (should be ignored)
(svn_dev_repo / "3.0.5rc1").mkdir()
# Create task-sdk RC
(task_sdk_dir / "1.0.5rc1").mkdir()
result = find_latest_release_candidate("1.0.5", str(svn_dev_repo), component="task-sdk")
assert result == "1.0.5rc1"
def test_find_latest_rc_handles_oserror(self, tmp_path):
"""Test that function handles OSError gracefully."""
svn_dev_repo = tmp_path / "dev" / "airflow"
svn_dev_repo.mkdir(parents=True)
with patch("os.listdir", side_effect=OSError("Permission denied")):
result = find_latest_release_candidate("3.0.5", str(svn_dev_repo), component="airflow")
assert result is None
class FakeDirEntry:
def __init__(self, name: str, *, is_dir: bool):
self.name = name
self._is_dir = is_dir
def is_dir(self) -> bool:
return self._is_dir
@pytest.fixture
def release_cmd():
"""Lazy import the release command module."""
import airflow_breeze.commands.release_command as module
return module
def test_remove_old_release_only_collects_release_directories(monkeypatch, release_cmd):
version = "3.0.5"
task_sdk_version = "1.0.5"
svn_release_repo = "/svn/release/repo"
# Arrange: entries include current release, old release directories, a matching "file", and non-release directory.
entries = [
FakeDirEntry(version, is_dir=True), # current release: should be skipped
FakeDirEntry("3.0.4", is_dir=True), # old release dir: should be included
FakeDirEntry("3.0.3", is_dir=True), # old release dir: should be included
FakeDirEntry("3.0.2", is_dir=False), # matches pattern but not a directory: excluded
FakeDirEntry("task-sdk", is_dir=True), # task-sdk directory: will be scanned separately
FakeDirEntry("not-a-release", is_dir=True), # directory but not matching pattern: excluded
]
# Task SDK directory entries
task_sdk_entries = [
FakeDirEntry(task_sdk_version, is_dir=True), # current task-sdk release: should be skipped
FakeDirEntry("1.0.4", is_dir=True), # old task-sdk release: should be included
FakeDirEntry("1.0.3", is_dir=True), # old task-sdk release: should be included
]
chdir_calls: list[str] = []
console_messages: list[str] = []
run_command_calls: list[list[str]] = []
getcwd_calls: list[int] = []
path_exists_calls: list[str] = []
def fake_confirm_action(prompt: str, **_kwargs) -> bool:
# First prompt decides whether we scan. We want to.
if prompt == "Do you want to look for old releases to remove?":
return True
# For each candidate, we decline removal to avoid running svn commands.
if prompt.startswith("Remove old release "):
return False
raise AssertionError(f"Unexpected confirm prompt: {prompt}")
def fake_getcwd() -> str:
getcwd_calls.append(1)
return "/original/dir"
def fake_path_exists(path: str) -> bool:
path_exists_calls.append(path)
return path == "/svn/release/repo/task-sdk"
def fake_scandir(path=None):
if path == "/svn/release/repo/task-sdk":
return iter(task_sdk_entries)
return iter(entries)
monkeypatch.setattr(release_cmd.os, "getcwd", fake_getcwd)
monkeypatch.setattr(release_cmd.os, "chdir", lambda p: chdir_calls.append(p))
monkeypatch.setattr(release_cmd.os, "scandir", fake_scandir)
monkeypatch.setattr(release_cmd.os.path, "exists", fake_path_exists)
monkeypatch.setattr(release_cmd.os.path, "join", lambda *args: "/".join(args))
monkeypatch.setattr(release_cmd, "confirm_action", fake_confirm_action)
monkeypatch.setattr(release_cmd, "console_print", lambda msg="": console_messages.append(str(msg)))
monkeypatch.setattr(release_cmd, "run_command", lambda cmd, **_kwargs: run_command_calls.append(cmd))
# Act
release_cmd.remove_old_release(
version=version, task_sdk_version=task_sdk_version, svn_release_repo=svn_release_repo
)
# Assert: only directory entries matching RELEASE_PATTERN, excluding current version, and sorted.
assert svn_release_repo in chdir_calls
assert "/original/dir" in chdir_calls
assert "The following old Airflow releases should be removed: ['3.0.3', '3.0.4']" in console_messages
assert (
"The following old Task SDK releases should be removed: ['task-sdk/1.0.3', 'task-sdk/1.0.4']"
in console_messages
)
assert run_command_calls == []
def test_remove_old_release_returns_early_when_user_declines(monkeypatch, release_cmd):
version = "3.0.5"
task_sdk_version = "1.0.5"
svn_release_repo = "/svn/release/repo"
confirm_prompts: list[str] = []
def fake_confirm_action(prompt: str, **_kwargs) -> bool:
confirm_prompts.append(prompt)
return False
def should_not_be_called(*_args, **_kwargs):
raise AssertionError("This should not have been called when user declines the initial prompt.")
monkeypatch.setattr(release_cmd, "confirm_action", fake_confirm_action)
monkeypatch.setattr(release_cmd.os, "getcwd", should_not_be_called)
monkeypatch.setattr(release_cmd.os, "chdir", should_not_be_called)
monkeypatch.setattr(release_cmd.os, "scandir", should_not_be_called)
monkeypatch.setattr(release_cmd, "console_print", should_not_be_called)
monkeypatch.setattr(release_cmd, "run_command", should_not_be_called)
release_cmd.remove_old_release(
version=version, task_sdk_version=task_sdk_version, svn_release_repo=svn_release_repo
)
assert confirm_prompts == ["Do you want to look for old releases to remove?"]
def test_remove_old_release_removes_confirmed_old_releases(monkeypatch, release_cmd):
version = "3.1.5"
task_sdk_version = "1.1.5"
svn_release_repo = "/svn/release/repo"
entries = [
FakeDirEntry("3.1.4", is_dir=True),
FakeDirEntry(version, is_dir=True),
FakeDirEntry("3.1.0", is_dir=True),
]
task_sdk_entries = [
FakeDirEntry("1.1.4", is_dir=True),
FakeDirEntry(task_sdk_version, is_dir=True),
FakeDirEntry("1.1.0", is_dir=True),
]
chdir_calls: list[str] = []
console_messages: list[str] = []
run_command_calls: list[tuple[list[str], dict]] = []
confirm_prompts: list[str] = []
getcwd_calls: list[int] = []
def fake_confirm_action(prompt: str, **_kwargs) -> bool:
confirm_prompts.append(prompt)
if prompt == "Do you want to look for old releases to remove?":
return True
if prompt == "Remove old release 3.1.0?":
return True
if prompt == "Remove old release 3.1.4?":
return False
if prompt.startswith("Remove old release task-sdk/"):
return False
raise AssertionError(f"Unexpected confirm prompt: {prompt}")
def fake_getcwd() -> str:
getcwd_calls.append(1)
return "/original/dir"
def fake_path_exists(path: str) -> bool:
return path == "/svn/release/repo/task-sdk"
def fake_scandir(path=None):
if path == "/svn/release/repo/task-sdk":
return iter(task_sdk_entries)
return iter(entries)
monkeypatch.setattr(release_cmd.os, "getcwd", fake_getcwd)
monkeypatch.setattr(release_cmd.os, "chdir", lambda path: chdir_calls.append(path))
monkeypatch.setattr(release_cmd.os, "scandir", fake_scandir)
monkeypatch.setattr(release_cmd.os.path, "exists", fake_path_exists)
monkeypatch.setattr(release_cmd.os.path, "join", lambda *args: "/".join(args))
monkeypatch.setattr(release_cmd, "confirm_action", fake_confirm_action)
monkeypatch.setattr(release_cmd, "console_print", lambda msg="": console_messages.append(str(msg)))
def fake_run_command(cmd: list[str], **kwargs):
run_command_calls.append((cmd, kwargs))
monkeypatch.setattr(release_cmd, "run_command", fake_run_command)
release_cmd.remove_old_release(
version=version, task_sdk_version=task_sdk_version, svn_release_repo=svn_release_repo
)
assert chdir_calls == [svn_release_repo, "/original/dir"]
assert confirm_prompts == [
"Do you want to look for old releases to remove?",
"Remove old release 3.1.0?",
"Remove old release 3.1.4?",
"Remove old release task-sdk/1.1.0?",
"Remove old release task-sdk/1.1.4?",
]
assert "The following old Airflow releases should be removed: ['3.1.0', '3.1.4']" in console_messages
assert (
"The following old Task SDK releases should be removed: ['task-sdk/1.1.0', 'task-sdk/1.1.4']"
in console_messages
)
assert "Removing old release 3.1.0" in console_messages
assert "Removing old release 3.1.4" in console_messages
assert "[success]Old releases removed" in console_messages
# Only 3.1.0 was confirmed, so we should run rm+commit for 3.1.0 only.
assert run_command_calls == [
(["svn", "rm", "3.1.0"], {"check": True}),
(["svn", "commit", "-m", "Remove old release: 3.1.0"], {"check": True}),
]
def test_remove_old_release_no_old_releases(monkeypatch, release_cmd):
version = "3.0.5"
task_sdk_version = "1.0.5"
svn_release_repo = "/svn/release/repo"
# Only current release exists
entries = [
FakeDirEntry(version, is_dir=True),
FakeDirEntry("task-sdk", is_dir=True), # task-sdk directory exists
]
task_sdk_entries = [
FakeDirEntry(task_sdk_version, is_dir=True), # Only current task-sdk release
]
chdir_calls: list[str] = []
console_messages: list[str] = []
run_command_calls: list[list[str]] = []
def fake_confirm_action(prompt: str, **_kwargs) -> bool:
if prompt == "Do you want to look for old releases to remove?":
return True
raise AssertionError(f"Unexpected confirm prompt: {prompt}")
def fake_getcwd() -> str:
return "/original/dir"
def fake_path_exists(path: str) -> bool:
return path == "/svn/release/repo/task-sdk"
def fake_scandir(path=None):
if path == "/svn/release/repo/task-sdk":
return iter(task_sdk_entries)
return iter(entries)
monkeypatch.setattr(release_cmd.os, "getcwd", fake_getcwd)
monkeypatch.setattr(release_cmd.os, "chdir", lambda path: chdir_calls.append(path))
monkeypatch.setattr(release_cmd.os, "scandir", fake_scandir)
monkeypatch.setattr(release_cmd.os.path, "exists", fake_path_exists)
monkeypatch.setattr(release_cmd.os.path, "join", lambda *args: "/".join(args))
monkeypatch.setattr(release_cmd, "confirm_action", fake_confirm_action)
monkeypatch.setattr(release_cmd, "console_print", lambda msg="": console_messages.append(str(msg)))
monkeypatch.setattr(release_cmd, "run_command", lambda cmd, **_kwargs: run_command_calls.append(cmd))
release_cmd.remove_old_release(
version=version, task_sdk_version=task_sdk_version, svn_release_repo=svn_release_repo
)
assert "No old releases to remove." in console_messages
assert run_command_calls == []
assert chdir_calls == [svn_release_repo, "/original/dir"]
def test_remove_old_release_task_sdk_only(monkeypatch, release_cmd):
version = "3.0.5"
task_sdk_version = "1.0.5"
svn_release_repo = "/svn/release/repo"
# Only current Airflow release exists, but old Task SDK releases exist
entries = [
FakeDirEntry(version, is_dir=True),
]
task_sdk_entries = [
FakeDirEntry(task_sdk_version, is_dir=True), # current task-sdk release
FakeDirEntry("1.0.4", is_dir=True), # old task-sdk release
FakeDirEntry("1.0.3", is_dir=True), # old task-sdk release
]
chdir_calls: list[str] = []
console_messages: list[str] = []
run_command_calls: list[tuple[list[str], dict]] = []
confirm_prompts: list[str] = []
def fake_confirm_action(prompt: str, **_kwargs) -> bool:
confirm_prompts.append(prompt)
if prompt == "Do you want to look for old releases to remove?":
return True
if prompt == "Remove old release task-sdk/1.0.3?":
return True
if prompt == "Remove old release task-sdk/1.0.4?":
return False
raise AssertionError(f"Unexpected confirm prompt: {prompt}")
def fake_getcwd() -> str:
return "/original/dir"
def fake_path_exists(path: str) -> bool:
return path == "/svn/release/repo/task-sdk"
def fake_scandir(path=None):
if path == "/svn/release/repo/task-sdk":
return iter(task_sdk_entries)
return iter(entries)
monkeypatch.setattr(release_cmd.os, "getcwd", fake_getcwd)
monkeypatch.setattr(release_cmd.os, "chdir", lambda path: chdir_calls.append(path))
monkeypatch.setattr(release_cmd.os, "scandir", fake_scandir)
monkeypatch.setattr(release_cmd.os.path, "exists", fake_path_exists)
monkeypatch.setattr(release_cmd.os.path, "join", lambda *args: "/".join(args))
monkeypatch.setattr(release_cmd, "confirm_action", fake_confirm_action)
monkeypatch.setattr(release_cmd, "console_print", lambda msg="": console_messages.append(str(msg)))
def fake_run_command(cmd: list[str], **kwargs):
run_command_calls.append((cmd, kwargs))
monkeypatch.setattr(release_cmd, "run_command", fake_run_command)
release_cmd.remove_old_release(
version=version, task_sdk_version=task_sdk_version, svn_release_repo=svn_release_repo
)
assert chdir_calls == [svn_release_repo, "/original/dir"]
assert (
"The following old Task SDK releases should be removed: ['task-sdk/1.0.3', 'task-sdk/1.0.4']"
in console_messages
)
assert "Removing old release task-sdk/1.0.3" in console_messages
assert "Removing old release task-sdk/1.0.4" in console_messages
assert "[success]Old releases removed" in console_messages
assert run_command_calls == [
(["svn", "rm", "task-sdk/1.0.3"], {"check": True}),
(["svn", "commit", "-m", "Remove old release: task-sdk/1.0.3"], {"check": True}),
]
def test_remove_old_release_no_task_sdk_version(monkeypatch, release_cmd):
version = "3.0.5"
task_sdk_version = None
svn_release_repo = "/svn/release/repo"
entries = [
FakeDirEntry(version, is_dir=True),
FakeDirEntry("3.0.4", is_dir=True), # old release
]
chdir_calls: list[str] = []
console_messages: list[str] = []
run_command_calls: list[list[str]] = []
def fake_confirm_action(prompt: str, **_kwargs) -> bool:
if prompt == "Do you want to look for old releases to remove?":
return True
if prompt.startswith("Remove old release "):
return False
raise AssertionError(f"Unexpected confirm prompt: {prompt}")
def fake_getcwd() -> str:
return "/original/dir"
monkeypatch.setattr(release_cmd.os, "getcwd", fake_getcwd)
monkeypatch.setattr(release_cmd.os, "chdir", lambda path: chdir_calls.append(path))
monkeypatch.setattr(release_cmd.os, "scandir", lambda: iter(entries))
monkeypatch.setattr(release_cmd, "confirm_action", fake_confirm_action)
monkeypatch.setattr(release_cmd, "console_print", lambda msg="": console_messages.append(str(msg)))
monkeypatch.setattr(release_cmd, "run_command", lambda cmd, **_kwargs: run_command_calls.append(cmd))
release_cmd.remove_old_release(
version=version, task_sdk_version=task_sdk_version, svn_release_repo=svn_release_repo
)
assert "The following old Airflow releases should be removed: ['3.0.4']" in console_messages
assert "task-sdk" not in " ".join(console_messages).lower()
assert run_command_calls == []
assert chdir_calls == [svn_release_repo, "/original/dir"]