blob: 660bc6cce2370fec1f649936ee6c223525ffcdec [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import warnings
from functools import wraps
from typing import TYPE_CHECKING, Callable, Sequence, TypeVar, cast
from flask import Response, g
from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
from airflow.auth.managers.models.resource_details import (
AccessView,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
DatasetDetails,
PoolDetails,
VariableDetails,
)
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.www.extensions.init_auth_manager import get_auth_manager
if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import ResourceMethod
T = TypeVar("T", bound=Callable)
def check_authentication() -> None:
"""Check that the request has valid authorization information."""
for auth in get_airflow_app().api_auth:
response = auth.requires_authentication(Response)()
if response.status_code == 200:
return
# Even if the current_user is anonymous, the AUTH_ROLE_PUBLIC might still have permission.
appbuilder = get_airflow_app().appbuilder
if appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC", None):
return
# since this handler only checks authentication, not authorization,
# we should always return 401
raise Unauthenticated(headers=response.headers)
def requires_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable[[T], T]:
"""
Check current user's permissions against required permissions.
Deprecated. Do not use this decorator, use one of the decorator `has_access_*` defined in
airflow/api_connexion/security.py instead.
This decorator will only work with FAB authentication and not with other auth providers.
This decorator might be used in user plugins, do not remove it.
"""
warnings.warn(
"The 'requires_access' decorator is deprecated. Please use one of the decorator `requires_access_*`"
"defined in airflow/api_connexion/security.py instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
from airflow.providers.fab.auth_manager.decorators.auth import _requires_access_fab
return _requires_access_fab(permissions)
def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
"""
Define the behavior whether the user is authorized to access the resource.
:param is_authorized_callback: callback to execute to figure whether the user is authorized to access
the resource
:param func: the function to call if the user is authorized
:param args: the arguments of ``func``
:param kwargs: the keyword arguments ``func``
:meta private:
"""
check_authentication()
if is_authorized_callback():
return func(*args, **kwargs)
raise PermissionDenied()
def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
section: str | None = kwargs.get("section")
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
method=method, details=ConfigurationDetails(section=section)
),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
connection_id: str | None = kwargs.get("connection_id")
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
method=method, details=ConnectionDetails(conn_id=connection_id)
),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_dag(
method: ResourceMethod, access_entity: DagAccessEntity | None = None
) -> Callable[[T], T]:
def _is_authorized_callback(dag_id: str):
def callback():
access = get_auth_manager().is_authorized_dag(
method=method,
access_entity=access_entity,
details=DagDetails(id=dag_id),
)
# ``access`` means here:
# - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
# - if no DAG id is provided: is the user authorized to access all DAGs
if dag_id or access or access_entity:
return access
# No DAG id is provided, the user is not authorized to access all DAGs and authorization is done
# on DAG level
# If method is "GET", return whether the user has read access to any DAGs
# If method is "PUT", return whether the user has edit access to any DAGs
return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
method == "PUT" and any(get_auth_manager().get_permitted_dag_ids(methods=["PUT"]))
)
return callback
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
dag_id: str | None = kwargs.get("dag_id") if kwargs.get("dag_id") != "~" else None
return _requires_access(
is_authorized_callback=_is_authorized_callback(dag_id),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_dataset(method: ResourceMethod) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
uri: str | None = kwargs.get("uri")
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_dataset(
method=method, details=DatasetDetails(uri=uri)
),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_pool(method: ResourceMethod) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
pool_name: str | None = kwargs.get("pool_name")
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_pool(
method=method, details=PoolDetails(name=pool_name)
),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_variable(method: ResourceMethod) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
variable_key: str | None = kwargs.get("variable_key")
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_variable(
method=method, details=VariableDetails(key=variable_key)
),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_view(access_view: AccessView) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_view(access_view=access_view),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def requires_access_custom_view(
method: ResourceMethod,
resource_name: str,
) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_custom_view(
method=method, resource_name=resource_name
),
func=func,
args=args,
kwargs=kwargs,
)
return cast(T, decorated)
return requires_access_decorator
def get_readable_dags() -> set[str]:
return get_auth_manager().get_permitted_dag_ids(user=g.user)