blob: 83c2d3f2020c4d14811c8c226792c60d4e38c49b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from datetime import timedelta
import pytest
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models.errors import ImportError
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import provide_session
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_import_errors
@pytest.fixture(scope="module")
def configured_app(minimal_app_for_api):
app = minimal_app_for_api
create_user(
app, # type:ignore
username="test",
role_name="Test",
permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore
)
create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore
yield minimal_app_for_api
delete_user(app, username="test") # type: ignore
delete_user(app, username="test_no_permissions") # type: ignore
class TestBaseImportError:
timestamp = "2020-06-10T12:00"
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app) -> None:
self.app = configured_app
self.client = self.app.test_client() # type:ignore
clear_db_import_errors()
def teardown_method(self) -> None:
clear_db_import_errors()
@staticmethod
def _normalize_import_errors(import_errors):
for i, import_error in enumerate(import_errors, 1):
import_error["import_error_id"] = i
class TestGetImportErrorEndpoint(TestBaseImportError):
def test_response_200(self, session):
import_error = ImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
session.add(import_error)
session.commit()
response = self.client.get(
f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
response_data = response.json
response_data["import_error_id"] = 1
assert {
"filename": "Lorem_ipsum.py",
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
} == response_data
def test_response_404(self):
response = self.client.get("/api/v1/importErrors/2", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 404
assert {
"detail": "The ImportError with import_error_id: `2` was not found",
"status": 404,
"title": "Import error not found",
"type": EXCEPTIONS_LINK_MAP[404],
} == response.json
def test_should_raises_401_unauthenticated(self, session):
import_error = ImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
session.add(import_error)
session.commit()
response = self.client.get(f"/api/v1/importErrors/{import_error.id}")
assert_401(response)
def test_should_raise_403_forbidden(self):
response = self.client.get(
"/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_no_permissions"}
)
assert response.status_code == 403
class TestGetImportErrorsEndpoint(TestBaseImportError):
def test_get_import_errors(self, session):
import_error = [
ImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
for _ in range(2)
]
session.add_all(import_error)
session.commit()
response = self.client.get("/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
response_data = response.json
self._normalize_import_errors(response_data["import_errors"])
assert {
"import_errors": [
{
"filename": "Lorem_ipsum.py",
"import_error_id": 1,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
},
{
"filename": "Lorem_ipsum.py",
"import_error_id": 2,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-10T12:00:00+00:00",
},
],
"total_entries": 2,
} == response_data
def test_get_import_errors_order_by(self, session):
import_error = [
ImportError(
filename=f"Lorem_ipsum{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC") + timedelta(days=-i),
)
for i in range(1, 3)
]
session.add_all(import_error)
session.commit()
response = self.client.get(
"/api/v1/importErrors?order_by=-timestamp", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
response_data = response.json
self._normalize_import_errors(response_data["import_errors"])
assert {
"import_errors": [
{
"filename": "Lorem_ipsum1.py",
"import_error_id": 1, # id normalized with self._normalize_import_errors
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-09T12:00:00+00:00",
},
{
"filename": "Lorem_ipsum2.py",
"import_error_id": 2,
"stack_trace": "Lorem ipsum",
"timestamp": "2020-06-08T12:00:00+00:00",
},
],
"total_entries": 2,
} == response_data
def test_order_by_raises_400_for_invalid_attr(self, session):
import_error = [
ImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
for _ in range(2)
]
session.add_all(import_error)
session.commit()
response = self.client.get(
"/api/v1/importErrors?order_by=timest", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 400
msg = "Ordering with 'timest' is disallowed or the attribute does not exist on the model"
assert response.json["detail"] == msg
def test_should_raises_401_unauthenticated(self, session):
import_error = [
ImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
for _ in range(2)
]
session.add_all(import_error)
session.commit()
response = self.client.get("/api/v1/importErrors")
assert_401(response)
class TestGetImportErrorsEndpointPagination(TestBaseImportError):
@pytest.mark.parametrize(
"url, expected_import_error_ids",
[
# Limit test data
("/api/v1/importErrors?limit=1", ["/tmp/file_1.py"]),
("/api/v1/importErrors?limit=100", [f"/tmp/file_{i}.py" for i in range(1, 101)]),
# Offset test data
("/api/v1/importErrors?offset=1", [f"/tmp/file_{i}.py" for i in range(2, 102)]),
("/api/v1/importErrors?offset=3", [f"/tmp/file_{i}.py" for i in range(4, 104)]),
# Limit and offset test data
("/api/v1/importErrors?offset=3&limit=3", [f"/tmp/file_{i}.py" for i in [4, 5, 6]]),
],
)
@provide_session
def test_limit_and_offset(self, url, expected_import_error_ids, session):
import_errors = [
ImportError(
filename=f"/tmp/file_{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
for i in range(1, 110)
]
session.add_all(import_errors)
session.commit()
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
import_ids = [pool["filename"] for pool in response.json["import_errors"]]
assert import_ids == expected_import_error_ids
def test_should_respect_page_size_limit_default(self, session):
import_errors = [
ImportError(
filename=f"/tmp/file_{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
for i in range(1, 110)
]
session.add_all(import_errors)
session.commit()
response = self.client.get("/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
assert len(response.json["import_errors"]) == 100
@conf_vars({("api", "maximum_page_limit"): "150"})
def test_should_return_conf_max_if_req_max_above_conf(self, session):
import_errors = [
ImportError(
filename=f"/tmp/file_{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
)
for i in range(200)
]
session.add_all(import_errors)
session.commit()
response = self.client.get(
"/api/v1/importErrors?limit=180", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
assert len(response.json["import_errors"]) == 150