Validate dbt `cause` field to be less than 255 characters (#38896)
* Validate dbt `trigger_reason` field to be less than 255 characters
* Rename function to better reflect the behavior
* Move validation of cause parameter to DbtCloudHook
* Add stacklevel to dbt `cause` warning
diff --git a/airflow/providers/dbt/cloud/hooks/dbt.py b/airflow/providers/dbt/cloud/hooks/dbt.py
index 268cd91..28a2406 100644
--- a/airflow/providers/dbt/cloud/hooks/dbt.py
+++ b/airflow/providers/dbt/cloud/hooks/dbt.py
@@ -19,6 +19,7 @@
import asyncio
import json
import time
+import warnings
from enum import Enum
from functools import cached_property, wraps
from inspect import signature
@@ -39,6 +40,8 @@
from airflow.models import Connection
+DBT_CAUSE_MAX_LENGTH = 255
+
def fallback_to_default_account(func: Callable) -> Callable:
"""
@@ -420,6 +423,15 @@
if additional_run_config is None:
additional_run_config = {}
+ if cause is not None and len(cause) > DBT_CAUSE_MAX_LENGTH:
+ warnings.warn(
+ f"Cause `{cause}` exceeds limit of {DBT_CAUSE_MAX_LENGTH}"
+ f" characters and will be truncated.",
+ UserWarning,
+ stacklevel=2,
+ )
+ cause = cause[:DBT_CAUSE_MAX_LENGTH]
+
payload = {
"cause": cause,
"steps_override": steps_override,
diff --git a/tests/providers/dbt/cloud/hooks/test_dbt.py b/tests/providers/dbt/cloud/hooks/test_dbt.py
index 9a65ba0..c701330 100644
--- a/tests/providers/dbt/cloud/hooks/test_dbt.py
+++ b/tests/providers/dbt/cloud/hooks/test_dbt.py
@@ -25,6 +25,7 @@
from airflow.exceptions import AirflowException
from airflow.models.connection import Connection
from airflow.providers.dbt.cloud.hooks.dbt import (
+ DBT_CAUSE_MAX_LENGTH,
DbtCloudHook,
DbtCloudJobRunException,
DbtCloudJobRunStatus,
@@ -401,6 +402,35 @@
)
@patch.object(DbtCloudHook, "run")
@patch.object(DbtCloudHook, "_paginate")
+ def test_trigger_job_run_with_longer_cause(self, mock_http_run, mock_paginate, conn_id, account_id):
+ hook = DbtCloudHook(conn_id)
+ cause = "Some cause that is longer than limit. " * 15
+ expected_cause = cause[:DBT_CAUSE_MAX_LENGTH]
+ assert len(cause) > DBT_CAUSE_MAX_LENGTH
+
+ with pytest.warns(
+ UserWarning,
+ match=f"Cause `{cause}` exceeds limit of {DBT_CAUSE_MAX_LENGTH}"
+ f" characters and will be truncated.",
+ ):
+ hook.trigger_job_run(job_id=JOB_ID, cause=cause, account_id=account_id)
+
+ assert hook.method == "POST"
+
+ _account_id = account_id or DEFAULT_ACCOUNT_ID
+ hook.run.assert_called_once_with(
+ endpoint=f"api/v2/accounts/{_account_id}/jobs/{JOB_ID}/run/",
+ data=json.dumps({"cause": expected_cause, "steps_override": None, "schema_override": None}),
+ )
+ hook._paginate.assert_not_called()
+
+ @pytest.mark.parametrize(
+ argnames="conn_id, account_id",
+ argvalues=[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
+ ids=["default_account", "explicit_account"],
+ )
+ @patch.object(DbtCloudHook, "run")
+ @patch.object(DbtCloudHook, "_paginate")
def test_list_job_runs(self, mock_http_run, mock_paginate, conn_id, account_id):
hook = DbtCloudHook(conn_id)
hook.list_job_runs(account_id=account_id)