blob: 29792dcab217aa3834a18099b139eaea112a34f9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Integration tests for DAG Run operations.
These tests validate the Execution API endpoints for DAG Run operations:
- get_state(): Get DAG run state
- get_count(): Get count of DAG runs
- get_previous(): Get previous DAG run
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from airflow.sdk.api.client import ServerResponseError
from airflow.sdk.api.datamodels._generated import DagRunStateResponse
from airflow.sdk.execution_time.comms import DRCount, PreviousDagRunResult
from task_sdk_tests import console
def test_dag_run_get_state(sdk_client, dag_info):
"""
Test getting state for DAG run.
Expected: DagRunStateResponse with running or success state
Endpoint: GET /execution/dag-runs/{dag_id}/{run_id}/state
"""
console.print("[yellow]Getting DAG run state...")
response = sdk_client.dag_runs.get_state(
dag_id=dag_info["dag_id"],
run_id=dag_info["dag_run_id"],
)
console.print(" DAG Run State Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
console.print(f"[bright_blue]State:[/] {response.state}")
console.print("=" * 72)
assert isinstance(response, DagRunStateResponse)
assert response.state is not None
assert response.state in ["running", "success"]
console.print("[green]✅ DAG run get state test passed!")
def test_dag_run_get_count(sdk_client, dag_info):
"""
Test getting count of DAG run.
Expected: DRCount with count >= 1
Endpoint: GET /execution/dag-runs/count
"""
console.print("[yellow]Getting DAG run count...")
response = sdk_client.dag_runs.get_count(
dag_id=dag_info["dag_id"],
run_ids=[dag_info["dag_run_id"]],
)
console.print(" DAG Run Count Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
console.print(f"[bright_blue]DAG ID:[/] {dag_info['dag_id']}")
console.print(f"[bright_blue]Count:[/] {response.count}")
console.print("=" * 72)
assert isinstance(response, DRCount)
assert response.count >= 1, f"Expected at least 1 DAG run, got {response.count}"
console.print("[green]✅ DAG run get count test passed!")
def test_dag_run_get_state_not_found(sdk_client, dag_info):
"""
Test getting state for non-existent DAG run.
Expected: ServerResponseError with 404 status code
Endpoint: GET /execution/dag-runs/{dag_id}/{run_id}/state
"""
console.print("[yellow]Getting non-existent DAG run state...")
with pytest.raises(ServerResponseError) as exc_info:
sdk_client.dag_runs.get_state(
dag_id="not_exist",
run_id=dag_info["dag_run_id"],
)
console.print(" Non-existent DAG Run State Response ".center(72, "="))
console.print(f"[bright_blue]Exception Type:[/] {type(exc_info.value).__name__}")
console.print(f"[bright_blue]Status Code:[/] {exc_info.value.response.status_code}")
console.print(f"[bright_blue]Error Message:[/] {str(exc_info.value)}")
console.print("=" * 72)
assert exc_info.value.response.status_code == 404
console.print("[green]✅ DAG run get state (not found) test passed!")
def test_dag_run_get_count_not_found(sdk_client, dag_info):
"""
Test getting count of non-existent DAG run.
Expected: DRCount with count=0
Endpoint: GET /execution/dag-runs/count
"""
console.print("[yellow]Getting non-existent DAG run count...")
response = sdk_client.dag_runs.get_count(
dag_id="not_exist",
run_ids=[dag_info["dag_run_id"]],
)
console.print(" Non-existent DAG Run Count Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
console.print(f"[bright_blue]Count:[/] {response.count}")
console.print("=" * 72)
assert isinstance(response, DRCount)
assert response.count == 0, f"Expected 0 DAG run, got {response.count}"
console.print("[green]✅ Non-existent DAG run get count test passed!")
def test_dag_run_get_previous(sdk_client, dag_info):
"""
Test getting previous DAG run before a logical date.
Expected: PreviousDagRunResult with dag_run field
Endpoint: GET /execution/dag-runs/previous
"""
console.print("[yellow]Getting previous DAG run state...")
response = sdk_client.dag_runs.get_previous(
dag_id=dag_info["dag_id"],
logical_date=datetime.now(timezone.utc) + timedelta(seconds=10),
)
console.print(" Previous DAG Run Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
console.print(f"[bright_blue]Previous DAG Run:[/] {response.dag_run}")
console.print("=" * 72)
assert isinstance(response, PreviousDagRunResult)
assert response.dag_run is not None
console.print("[green]✅ Previous DAG run get test passed!")
def test_dag_run_get_previous_not_found(sdk_client):
"""
Test getting previous DAG run for non-existent DAG.
Expected: PreviousDagRunResult with dag_run is None
Endpoint: GET /execution/dag-runs/previous
"""
console.print("[yellow]Getting non-existent previous DAG run...")
response = sdk_client.dag_runs.get_previous(
dag_id="not_exist",
logical_date=datetime.now(timezone.utc) + timedelta(seconds=10),
)
console.print(" Non-existent Previous DAG Run Response ".center(72, "="))
console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
console.print(f"[bright_blue]Detail:[/] {response.dag_run}")
console.print("=" * 72)
assert isinstance(response, PreviousDagRunResult)
assert response.dag_run is None
console.print("[green]✅ Previous DAG run (not found) get test passed!")