blob: aa02573e7d0c5af25ef093dd9062a1c0e1c3cf4b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from unittest import mock
import pytest
from airflow.configuration import ensure_secrets_loaded, initialize_secrets_backends
from airflow.models import Connection, Variable
from airflow.sdk import SecretCache
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_variables
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
@skip_if_force_lowest_dependencies_marker
class TestConnectionsFromSecrets:
def setup_method(self) -> None:
SecretCache.reset()
@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_connection")
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_connection")
def test_get_connection_second_try(self, mock_env_get, mock_meta_get):
mock_env_get.side_effect = [None] # return None
Connection.get_connection_from_secrets("fake_conn_id")
mock_meta_get.assert_called_once_with(conn_id="fake_conn_id")
mock_env_get.assert_called_once_with(conn_id="fake_conn_id")
@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_connection")
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_connection")
def test_get_connection_first_try(self, mock_env_get, mock_meta_get):
mock_env_get.return_value = Connection("something") # returns something
Connection.get_connection_from_secrets("fake_conn_id")
mock_env_get.assert_called_once_with(conn_id="fake_conn_id")
mock_meta_get.assert_not_called()
@conf_vars(
{
(
"secrets",
"backend",
): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend",
("secrets", "backend_kwargs"): '{"connections_prefix": "/airflow", "profile_name": null}',
}
)
def test_initialize_secrets_backends(self):
backends = initialize_secrets_backends()
backend_classes = [backend.__class__.__name__ for backend in backends]
assert len(backends) == 3
assert "SystemsManagerParameterStoreBackend" in backend_classes
@conf_vars(
{
(
"secrets",
"backend",
): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend",
("secrets", "backend_kwargs"): '{"use_ssl": false}',
}
)
def test_backends_kwargs(self):
backends = initialize_secrets_backends()
systems_manager = next(
backend
for backend in backends
if backend.__class__.__name__ == "SystemsManagerParameterStoreBackend"
)
assert systems_manager.kwargs == {}
assert systems_manager.use_ssl is False
@conf_vars(
{
(
"secrets",
"backend",
): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend",
("secrets", "backend_kwargs"): '{"connections_prefix": "/airflow", "profile_name": null}',
}
)
@mock.patch.dict(
"os.environ",
{
"AIRFLOW_CONN_TEST_MYSQL": "mysql://airflow:airflow@host:5432/airflow",
},
)
@mock.patch(
"airflow.providers.amazon.aws.secrets.systems_manager."
"SystemsManagerParameterStoreBackend.get_connection"
)
def test_backend_fallback_to_env_var(self, mock_get_connection):
mock_get_connection.return_value = None
backends = ensure_secrets_loaded()
backend_classes = [backend.__class__.__name__ for backend in backends]
assert "SystemsManagerParameterStoreBackend" in backend_classes
conn = Connection.get_connection_from_secrets(conn_id="test_mysql")
# Assert that SystemsManagerParameterStoreBackend.get_conn_uri was called
mock_get_connection.assert_called_once_with(conn_id="test_mysql")
assert conn.get_uri() == "mysql://airflow:airflow@host:5432/airflow"
@skip_if_force_lowest_dependencies_marker
@pytest.mark.db_test
class TestVariableFromSecrets:
def setup_method(self) -> None:
clear_db_variables()
SecretCache.reset()
def teardown_method(self) -> None:
clear_db_variables()
@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_variable")
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
def test_get_variable_second_try(self, mock_env_get, mock_meta_get):
"""
Test if Variable is not present in Environment Variable, it then looks for it in
Metastore DB
"""
mock_env_get.return_value = None
mock_meta_get.return_value = "val"
Variable.get_variable_from_secrets("fake_var_key")
mock_meta_get.assert_called_once_with(key="fake_var_key")
mock_env_get.assert_called_once_with(key="fake_var_key")
@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_variable")
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
def test_get_variable_first_try(self, mock_env_get, mock_meta_get):
"""
Test if Variable is present in Environment Variable, it does not look for it in
Metastore DB
"""
mock_env_get.return_value = "something"
Variable.get_variable_from_secrets("fake_var_key")
mock_env_get.assert_called_once_with(key="fake_var_key")
mock_meta_get.assert_not_called()
def test_backend_fallback_to_default_var(self):
"""
Test if a default_var is defined and no backend has the Variable,
the value returned is default_var
"""
variable_value = Variable.get(key="test_var", default_var="new")
assert variable_value == "new"
@conf_vars(
{
(
"secrets",
"backend",
): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend",
("secrets", "backend_kwargs"): '{"variables_prefix": "/airflow", "profile_name": null}',
}
)
@mock.patch.dict(
"os.environ",
{
"AIRFLOW_VAR_MYVAR": "a_venv_value",
},
)
@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_variable")
@mock.patch(
"airflow.providers.amazon.aws.secrets.systems_manager."
"SystemsManagerParameterStoreBackend.get_variable"
)
def test_backend_variable_order(self, mock_secret_get, mock_meta_get):
backends = ensure_secrets_loaded()
backend_classes = [backend.__class__.__name__ for backend in backends]
assert "SystemsManagerParameterStoreBackend" in backend_classes
mock_secret_get.return_value = None
mock_meta_get.return_value = None
assert Variable.get(key="MYVAR") == "a_venv_value"
mock_secret_get.assert_called_with(key="MYVAR")
mock_meta_get.assert_not_called()
mock_secret_get.return_value = None
mock_meta_get.return_value = "a_metastore_value"
assert Variable.get(key="not_myvar") == "a_metastore_value"
mock_meta_get.assert_called_once_with(key="not_myvar")
mock_secret_get.return_value = "a_secret_value"
assert Variable.get(key="not_myvar") == "a_secret_value"