blob: c701330600e2fa69cf1c47ec1830416725b94ce9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import json
from typing import Any
from unittest.mock import patch
import pytest
from airflow.exceptions import AirflowException
from airflow.models.connection import Connection
from airflow.providers.dbt.cloud.hooks.dbt import (
DBT_CAUSE_MAX_LENGTH,
DbtCloudHook,
DbtCloudJobRunException,
DbtCloudJobRunStatus,
TokenAuth,
fallback_to_default_account,
)
from airflow.utils import db
pytestmark = pytest.mark.db_test
ACCOUNT_ID_CONN = "account_id_conn"
NO_ACCOUNT_ID_CONN = "no_account_id_conn"
SINGLE_TENANT_CONN = "single_tenant_conn"
DEFAULT_ACCOUNT_ID = 11111
ACCOUNT_ID = 22222
SINGLE_TENANT_DOMAIN = "single.tenant.getdbt.com"
TOKEN = "token"
PROJECT_ID = 33333
JOB_ID = 4444
RUN_ID = 5555
BASE_URL = "https://cloud.getdbt.com/"
SINGLE_TENANT_URL = "https://single.tenant.getdbt.com/"
class TestDbtCloudJobRunStatus:
valid_job_run_statuses = [
1, # QUEUED
2, # STARTING
3, # RUNNING
10, # SUCCESS
20, # ERROR
30, # CANCELLED
[1, 2, 3], # QUEUED, STARTING, and RUNNING
{10, 20, 30}, # SUCCESS, ERROR, and CANCELLED
]
invalid_job_run_statuses = [
123, # Single invalid status
[123, 23, 65], # Multiple invalid statuses
[1, 2, 65], # Not all statuses are valid
"1", # String types are not valid
"12",
["1", "2", "65"],
]
def _get_ids(status_set: Any):
return [f"checking_status_{argval}" for argval in status_set]
@pytest.mark.parametrize(
argnames="statuses",
argvalues=valid_job_run_statuses,
ids=_get_ids(valid_job_run_statuses),
)
def test_valid_job_run_status(self, statuses):
DbtCloudJobRunStatus.check_is_valid(statuses)
@pytest.mark.parametrize(
argnames="statuses",
argvalues=invalid_job_run_statuses,
ids=_get_ids(invalid_job_run_statuses),
)
def test_invalid_job_run_status(self, statuses):
with pytest.raises(ValueError):
DbtCloudJobRunStatus.check_is_valid(statuses)
@pytest.mark.parametrize(
argnames="statuses",
argvalues=valid_job_run_statuses,
ids=_get_ids(valid_job_run_statuses),
)
def test_valid_terminal_job_run_status(self, statuses):
DbtCloudJobRunStatus.check_is_valid(statuses)
@pytest.mark.parametrize(
argnames="statuses",
argvalues=invalid_job_run_statuses,
ids=_get_ids(invalid_job_run_statuses),
)
def test_invalid_terminal_job_run_status(self, statuses):
with pytest.raises(ValueError):
DbtCloudJobRunStatus.check_is_valid(statuses)
class TestDbtCloudHook:
def setup_class(self):
# Connection with ``account_id`` specified
account_id_conn = Connection(
conn_id=ACCOUNT_ID_CONN,
conn_type=DbtCloudHook.conn_type,
login=DEFAULT_ACCOUNT_ID,
password=TOKEN,
)
# Connection with no ``account_id`` specified
no_account_id_conn = Connection(
conn_id=NO_ACCOUNT_ID_CONN,
conn_type=DbtCloudHook.conn_type,
password=TOKEN,
)
# Connection with `host` parameter set
host_conn = Connection(
conn_id=SINGLE_TENANT_CONN,
conn_type=DbtCloudHook.conn_type,
login=DEFAULT_ACCOUNT_ID,
password=TOKEN,
host=SINGLE_TENANT_DOMAIN,
)
db.merge_conn(account_id_conn)
db.merge_conn(no_account_id_conn)
db.merge_conn(host_conn)
@pytest.mark.parametrize(
argnames="conn_id, url",
argvalues=[(ACCOUNT_ID_CONN, BASE_URL), (SINGLE_TENANT_CONN, SINGLE_TENANT_URL)],
ids=["multi-tenant", "single-tenant"],
)
def test_init_hook(self, conn_id, url):
hook = DbtCloudHook(conn_id)
assert hook.auth_type == TokenAuth
assert hook.method == "POST"
assert hook.dbt_cloud_conn_id == conn_id
@pytest.mark.parametrize(
argnames="conn_id, url",
argvalues=[(ACCOUNT_ID_CONN, BASE_URL), (SINGLE_TENANT_CONN, SINGLE_TENANT_URL)],
ids=["multi-tenant", "single-tenant"],
)
def test_tenant_base_url(self, conn_id, url):
hook = DbtCloudHook(conn_id)
hook.get_conn()
assert hook.base_url == url
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
def test_fallback_to_default_account(self, conn_id, account_id):
hook = DbtCloudHook(conn_id)
def dbt_cloud_func(_, account_id=None):
return account_id
_account_id = account_id or DEFAULT_ACCOUNT_ID
if conn_id == ACCOUNT_ID_CONN:
assert fallback_to_default_account(dbt_cloud_func)(hook, account_id=account_id) == _account_id
assert fallback_to_default_account(dbt_cloud_func)(hook) == _account_id
if conn_id == NO_ACCOUNT_ID_CONN:
assert fallback_to_default_account(dbt_cloud_func)(hook, account_id=account_id) == _account_id
with pytest.raises(AirflowException):
fallback_to_default_account(dbt_cloud_func)(hook)
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_accounts(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_accounts()
assert hook.method == "GET"
hook.run.assert_called_once_with(endpoint=None, data=None)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_account(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.get_account(account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(endpoint=f"api/v2/accounts/{_account_id}/", data=None)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_projects(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_projects(account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_not_called()
hook._paginate.assert_called_once_with(
endpoint=f"api/v3/accounts/{_account_id}/projects/", payload=None
)
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_project(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.get_project(project_id=PROJECT_ID, account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v3/accounts/{_account_id}/projects/{PROJECT_ID}/", data=None
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_jobs(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_jobs(account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook._paginate.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/jobs/", payload={"order_by": None, "project_id": None}
)
hook.run.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_jobs_with_payload(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_jobs(project_id=PROJECT_ID, account_id=account_id, order_by="-id")
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook._paginate.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/jobs/",
payload={"order_by": "-id", "project_id": PROJECT_ID},
)
hook.run.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_job(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.get_job(job_id=JOB_ID, account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(endpoint=f"api/v2/accounts/{_account_id}/jobs/{JOB_ID}", data=None)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_trigger_job_run(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
cause = ""
hook.trigger_job_run(job_id=JOB_ID, cause=cause, account_id=account_id)
assert hook.method == "POST"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/jobs/{JOB_ID}/run/",
data=json.dumps({"cause": cause, "steps_override": None, "schema_override": None}),
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_trigger_job_run_with_overrides(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
cause = ""
steps_override = ["dbt test", "dbt run"]
schema_override = ["other_schema"]
hook.trigger_job_run(
job_id=JOB_ID,
cause=cause,
account_id=account_id,
steps_override=steps_override,
schema_override=schema_override,
)
assert hook.method == "POST"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/jobs/{JOB_ID}/run/",
data=json.dumps(
{"cause": cause, "steps_override": steps_override, "schema_override": schema_override}
),
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_trigger_job_run_with_additional_run_configs(
self, mock_http_run, mock_paginate, conn_id, account_id
):
hook = DbtCloudHook(conn_id)
cause = ""
additional_run_config = {"threads_override": 8, "generate_docs_override": False}
hook.trigger_job_run(
job_id=JOB_ID, cause=cause, account_id=account_id, additional_run_config=additional_run_config
)
assert hook.method == "POST"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/jobs/{JOB_ID}/run/",
data=json.dumps(
{
"cause": cause,
"steps_override": None,
"schema_override": None,
"threads_override": 8,
"generate_docs_override": False,
}
),
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_trigger_job_run_with_longer_cause(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
cause = "Some cause that is longer than limit. " * 15
expected_cause = cause[:DBT_CAUSE_MAX_LENGTH]
assert len(cause) > DBT_CAUSE_MAX_LENGTH
with pytest.warns(
UserWarning,
match=f"Cause `{cause}` exceeds limit of {DBT_CAUSE_MAX_LENGTH}"
f" characters and will be truncated.",
):
hook.trigger_job_run(job_id=JOB_ID, cause=cause, account_id=account_id)
assert hook.method == "POST"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/jobs/{JOB_ID}/run/",
data=json.dumps({"cause": expected_cause, "steps_override": None, "schema_override": None}),
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_job_runs(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_job_runs(account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_not_called()
hook._paginate.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/",
payload={
"include_related": None,
"job_definition_id": None,
"order_by": None,
},
)
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_job_runs_with_payload(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_job_runs(
account_id=account_id, include_related=["job"], job_definition_id=JOB_ID, order_by="id"
)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_not_called()
hook._paginate.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/",
payload={
"include_related": ["job"],
"job_definition_id": JOB_ID,
"order_by": "id",
},
)
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
def test_get_job_runs(self, mock_http_run, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.get_job_runs(account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(endpoint=f"api/v2/accounts/{_account_id}/runs/", data=None)
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_job_run(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.get_job_run(run_id=RUN_ID, account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/", data={"include_related": None}
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_job_run_with_payload(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.get_job_run(run_id=RUN_ID, account_id=account_id, include_related=["triggers"])
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/", data={"include_related": ["triggers"]}
)
hook._paginate.assert_not_called()
wait_for_job_run_status_test_args = [
(DbtCloudJobRunStatus.SUCCESS.value, DbtCloudJobRunStatus.SUCCESS.value, True),
(DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value, False),
(DbtCloudJobRunStatus.CANCELLED.value, DbtCloudJobRunStatus.SUCCESS.value, False),
(DbtCloudJobRunStatus.RUNNING.value, DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
(DbtCloudJobRunStatus.QUEUED.value, DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
(DbtCloudJobRunStatus.STARTING.value, DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
(DbtCloudJobRunStatus.SUCCESS.value, DbtCloudJobRunStatus.TERMINAL_STATUSES.value, True),
(DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.TERMINAL_STATUSES.value, True),
(DbtCloudJobRunStatus.CANCELLED.value, DbtCloudJobRunStatus.TERMINAL_STATUSES.value, True),
]
@pytest.mark.parametrize(
argnames=("job_run_status", "expected_status", "expected_output"),
argvalues=wait_for_job_run_status_test_args,
ids=[
(
f"run_status_{argval[0]}_expected_{argval[1]}"
if isinstance(argval[1], int)
else f"run_status_{argval[0]}_expected_AnyTerminalStatus"
)
for argval in wait_for_job_run_status_test_args
],
)
def test_wait_for_job_run_status(hook, job_run_status, expected_status, expected_output):
config = {"run_id": RUN_ID, "timeout": 3, "check_interval": 1, "expected_statuses": expected_status}
hook = DbtCloudHook(ACCOUNT_ID_CONN)
with patch.object(DbtCloudHook, "get_job_run_status") as mock_job_run_status:
mock_job_run_status.return_value = job_run_status
if expected_output != "timeout":
assert hook.wait_for_job_run_status(**config) == expected_output
else:
with pytest.raises(DbtCloudJobRunException):
hook.wait_for_job_run_status(**config)
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_cancel_job_run(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.cancel_job_run(run_id=RUN_ID, account_id=account_id)
assert hook.method == "POST"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/cancel/", data=None
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_job_run_artifacts(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_job_run_artifacts(run_id=RUN_ID, account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/artifacts/", data={"step": None}
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_list_job_run_artifacts_with_payload(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_job_run_artifacts(run_id=RUN_ID, account_id=account_id, step=2)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/artifacts/", data={"step": 2}
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_job_run_artifact(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
path = "manifest.json"
hook.get_job_run_artifact(run_id=RUN_ID, path=path, account_id=account_id)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/artifacts/{path}", data={"step": None}
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id, account_id",
argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
def test_get_job_run_artifact_with_payload(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
path = "manifest.json"
hook.get_job_run_artifact(run_id=RUN_ID, path="manifest.json", account_id=account_id, step=2)
assert hook.method == "GET"
_account_id = account_id or DEFAULT_ACCOUNT_ID
hook.run.assert_called_once_with(
endpoint=f"api/v2/accounts/{_account_id}/runs/{RUN_ID}/artifacts/{path}", data={"step": 2}
)
hook._paginate.assert_not_called()
@pytest.mark.parametrize(
argnames="conn_id",
argvalues=[ACCOUNT_ID_CONN, NO_ACCOUNT_ID_CONN],
ids=["default_account", "explicit_account"],
)
def test_connection_success(self, requests_mock, conn_id):
requests_mock.get(BASE_URL, status_code=200)
status, msg = DbtCloudHook(conn_id).test_connection()
assert status is True
assert msg == "Successfully connected to dbt Cloud."
@pytest.mark.parametrize(
argnames="conn_id",
argvalues=[ACCOUNT_ID_CONN, NO_ACCOUNT_ID_CONN],
ids=["default_account", "explicit_account"],
)
def test_connection_failure(self, requests_mock, conn_id):
requests_mock.get(BASE_URL, status_code=403, reason="Authentication credentials were not provided")
status, msg = DbtCloudHook(conn_id).test_connection()
assert status is False
assert msg == "403:Authentication credentials were not provided"