Enhance CLI Test command to accept a JSON-formatted dictionary of params that can be added to a task's params dict.
The CLI-provided params will overwrite params of the same name defined in the task definition if a key conflict occurs. This change will allow us to provide parameters to a DAG at runtime that are specific to a 'test' command run.
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index b02de14..0058d3a 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -304,6 +304,10 @@
raise AirflowException('dag_id could not be found')
dag = dagbag.dags[args.dag_id]
task = dag.get_task(task_id=args.task_id)
+ # Add CLI provided task_params to task.params
+ if args.task_params:
+ passed_in_params = json.loads(args.task_params)
+ task.params.update(passed_in_params)
ti = TaskInstance(task, args.execution_date)
if args.dry_run:
@@ -636,6 +640,8 @@
default=DAGS_FOLDER)
parser_test.add_argument(
"-dr", "--dry_run", help="Perform a dry run", action="store_true")
+ parser_test.add_argument(
+ "-tp", "--task_params", help="Sends a JSON params dict to the task")
parser_test.set_defaults(func=test)
ht = "Get the status of a task instance."
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
new file mode 100644
index 0000000..faee40d
--- /dev/null
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.operators import BashOperator, PythonOperator
+
+dag = DAG("example_passing_params_via_test_command",
+ default_args={"owner" : "me",
+ "start_date":datetime.now()},
+ schedule_interval='*/1 * * * *',
+ dagrun_timeout=timedelta(minutes=4)
+ )
+
+def my_py_command(ds, **kwargs):
+ # Print out the "foo" param passed in via
+ # `airflow test example_passing_params_via_test_command run_this <date>
+ # -tp '{"foo":"bar"}'`
+ if kwargs["test_mode"]:
+ print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
+ = {}".format( kwargs["test_mode"], kwargs["params"]["foo"]) )
+ # Print out the value of "miff", passed in below via the Python Operator
+ print(" 'miff' was passed in via task params = {}".format( kwargs["params"]["miff"]) )
+ return 1
+
+my_templated_command = """
+ echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
+ echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
+"""
+
+run_this = PythonOperator(
+ task_id='run_this',
+ provide_context=True,
+ python_callable=my_py_command,
+ params={"miff":"agg"},
+ dag=dag)
+
+also_run_this = BashOperator(
+ task_id='also_run_this',
+ bash_command=my_templated_command,
+ params={"miff":"agg"},
+ dag=dag)
+also_run_this.set_upstream(run_this)
diff --git a/tests/core.py b/tests/core.py
index 734c46a..68b864d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -31,7 +31,7 @@
from airflow.utils import AirflowException
from airflow.configuration import AirflowConfigException
-NUM_EXAMPLE_DAGS = 12
+NUM_EXAMPLE_DAGS = 13
DEV_NULL = '/dev/null'
DEFAULT_DATE = datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
@@ -638,6 +638,14 @@
'test', 'example_bash_operator', 'runme_0', '--dry_run',
DEFAULT_DATE.isoformat()]))
+ def test_cli_test_with_params(self):
+ cli.test(self.parser.parse_args([
+ 'test', 'example_passing_params_via_test_command', 'run_this',
+ '-tp', '{"foo":"bar"}', DEFAULT_DATE.isoformat()]))
+ cli.test(self.parser.parse_args([
+ 'test', 'example_passing_params_via_test_command', 'also_run_this',
+ '-tp', '{"foo":"bar"}', DEFAULT_DATE.isoformat()]))
+
def test_cli_run(self):
cli.run(self.parser.parse_args([
'run', 'example_bash_operator', 'runme_0', '-l',