Scheduler to handle incrementing of try_number (#39336)
Previously, there was a lot of bad stuff happening around try_number.
We incremented it when task started running. And because of that, we had this logic to return "_try_number + 1" when task not running. But this gave the "right" try number before it ran, and the wrong number after it ran. And, since it was naively incremented when task starts running -- i.e. without regard to why it is running -- we decremented it when deferring or exiting on a reschedule.
What I do here is try to remove all of that stuff:
no more private _try_number attr
no more getter logic
no more decrementing
no more incrementing as part of task execution
Now what we do is increment only when the task is set to scheduled and only when it's not coming out of deferral or "up_for_reschedule". So the try_number will be more stable. It will not change throughout the course of task execution. The only time it will be incremented is when there's legitimately a new try.
One consequence of this is that try number will no longer be incremented if you run either airlfow tasks run or ti.run() in isolation. But because airflow assumes that all tasks runs are scheduled by the scheduler, I do not regard this to be a breaking change.
If user code or provider code has implemented hacks to get the "right" try_number when looking at it at the wrong time (because previously it gave the wrong answer), unfortunately that code will just have to be patched. There are only two cases I know of in the providers codebase -- openlineage listener, and dbt openlineage.
As a courtesy for backcompat we also add property _try_number which is just a proxy for try_number, so you'll still be able to access this attr. But, it will not behave the same as it did before.
---------
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index a9d8325..76f279b 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -158,10 +158,6 @@
qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
tis_altered += session.scalars(qry_sub_dag.with_for_update()).all()
for task_instance in tis_altered:
- # The try_number was decremented when setting to up_for_reschedule and deferred.
- # Increment it back when changing the state again
- if task_instance.state in (TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RESCHEDULE):
- task_instance._try_number += 1
task_instance.set_state(state, session=session)
session.flush()
else:
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py
index 4777b8b..f4ea4bd 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -53,7 +53,7 @@
end_date = auto_field()
duration = auto_field()
state = TaskInstanceStateField()
- _try_number = auto_field(data_key="try_number")
+ try_number = auto_field()
max_tries = auto_field()
task_display_name = fields.String(attribute="task_display_name", dump_only=True)
hostname = auto_field()
diff --git a/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py b/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
index 44e6bad..3335b7d 100644
--- a/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
+++ b/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
@@ -30,7 +30,7 @@
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
- return max(3 - ti._try_number + 1, 1)
+ return max(3 - ti.try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py
index ace6a00..4fd6b24 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -22,7 +22,7 @@
import attr
import pendulum
-from sqlalchemy import select, tuple_, update
+from sqlalchemy import case, or_, select, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
@@ -245,7 +245,16 @@
session.execute(
update(TI)
.where(filter_for_tis)
- .values(state=TaskInstanceState.SCHEDULED)
+ .values(
+ state=TaskInstanceState.SCHEDULED,
+ try_number=case(
+ (
+ or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
+ TI.try_number + 1,
+ ),
+ else_=TI.try_number,
+ ),
+ )
.execution_options(synchronize_session=False)
)
session.flush()
@@ -425,6 +434,8 @@
try:
for ti in dag_run.get_task_instances(session=session):
if ti in schedulable_tis:
+ if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+ ti.try_number += 1
ti.set_state(TaskInstanceState.SCHEDULED)
if ti.state != TaskInstanceState.REMOVED:
tasks_to_run[ti.key] = ti
@@ -515,6 +526,7 @@
if key in ti_status.running:
ti_status.running.pop(key)
# Reset the failed task in backfill to scheduled state
+ ti.try_number += 1
ti.set_state(TaskInstanceState.SCHEDULED, session=session)
if ti.dag_run not in ti_status.active_runs:
ti_status.active_runs.add(ti.dag_run)
@@ -552,6 +564,14 @@
else:
self.log.debug("Sending %s to executor", ti)
# Skip scheduled state, we are executing immediately
+ if ti.state in (TaskInstanceState.UP_FOR_RETRY, None):
+ # i am not sure why this is necessary.
+ # seemingly a quirk of backfill runner.
+ # it should be handled elsewhere i think.
+ # seems the leaf tasks are set SCHEDULED but others not.
+ # but i am not going to look too closely since we need
+ # to nuke the current backfill approach anyway.
+ ti.try_number += 1
ti.state = TaskInstanceState.QUEUED
ti.queued_by_job_id = self.job.id
ti.queued_dttm = timezone.utcnow()
@@ -695,7 +715,9 @@
self.log.debug(e)
perform_heartbeat(
- job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
+ job=self.job,
+ heartbeat_callback=self.heartbeat_callback,
+ only_if_necessary=True,
)
# execute the tasks in the queue
executor.heartbeat()
@@ -725,6 +747,7 @@
ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis})
for new_ti in new_mapped_tis:
+ new_ti.try_number += 1
new_ti.set_state(TaskInstanceState.SCHEDULED, session=session)
# Set state to failed for running TIs that are set up for retry if disable-retry flag is set
@@ -930,7 +953,6 @@
"combination. Please adjust backfill dates or wait for this DagRun to finish.",
)
return
- # picklin'
pickle_id = None
executor_class, _ = ExecutorLoader.import_default_executor_cls()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9db0de4..c9a8424 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2948,6 +2948,8 @@
session.expire_all()
schedulable_tis, _ = dr.update_state(session=session)
for s in schedulable_tis:
+ if s.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+ s.try_number += 1
s.state = TaskInstanceState.SCHEDULED
session.commit()
# triggerer may mark tasks scheduled so we read from DB
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 117dd59..84076ee 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -45,7 +45,7 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates
-from sqlalchemy.sql.expression import false, select, true
+from sqlalchemy.sql.expression import case, false, select, true
from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
@@ -1545,7 +1545,8 @@
and not ti.task.on_success_callback
and not ti.task.outlets
):
- ti._try_number += 1
+ if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+ ti.try_number += 1
ti.defer_task(
defer=TaskDeferred(trigger=ti.task.start_trigger, method_name=ti.task.next_method),
session=session,
@@ -1567,7 +1568,16 @@
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index), schedulable_ti_ids_chunk),
)
- .values(state=TaskInstanceState.SCHEDULED)
+ .values(
+ state=TaskInstanceState.SCHEDULED,
+ try_number=case(
+ (
+ or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
+ TI.try_number + 1,
+ ),
+ else_=TI.try_number,
+ ),
+ )
.execution_options(synchronize_session=False)
).rowcount
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1a9d1e0..3bc96ec 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -38,6 +38,7 @@
import jinja2
import lazy_object_proxy
import pendulum
+from deprecated import deprecated
from jinja2 import TemplateAssertionError, UndefinedError
from sqlalchemy import (
Column,
@@ -281,14 +282,13 @@
ti.refresh_from_task(task)
if TYPE_CHECKING:
assert ti.task
- task_retries = task.retries
- ti.max_tries = ti.try_number + task_retries - 1
+ ti.max_tries = ti.try_number + task.retries
else:
# Ignore errors when updating max_tries if the DAG or
# task are not found since database records could be
# outdated. We make max_tries the maximum value of its
# original max_tries or the last attempted try number.
- ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
+ ti.max_tries = max(ti.max_tries, ti.try_number)
ti.state = None
ti.external_executor_id = None
ti.clear_next_method_args()
@@ -539,7 +539,7 @@
task_instance.end_date = ti.end_date
task_instance.duration = ti.duration
task_instance.state = ti.state
- task_instance.try_number = _get_private_try_number(task_instance=ti)
+ task_instance.try_number = ti.try_number
task_instance.max_tries = ti.max_tries
task_instance.hostname = ti.hostname
task_instance.unixname = ti.unixname
@@ -928,53 +928,6 @@
TaskInstance.save_to_db(failure_context["ti"], session)
-def _get_try_number(*, task_instance: TaskInstance):
- """
- Return the try number that a task number will be when it is actually run.
-
- If the TaskInstance is currently running, this will match the column in the
- database, in all other cases this will be incremented.
-
- This is designed so that task logs end up in the right file.
-
- :param task_instance: the task instance
-
- :meta private:
- """
- if task_instance.state == TaskInstanceState.RUNNING:
- return task_instance._try_number
- return task_instance._try_number + 1
-
-
-def _get_private_try_number(*, task_instance: TaskInstance | TaskInstancePydantic):
- """
- Opposite of _get_try_number.
-
- Given the value returned by try_number, return the value of _try_number that
- should produce the same result.
- This is needed for setting _try_number on TaskInstance from the value on PydanticTaskInstance, which has no private attrs.
-
- :param task_instance: the task instance
-
- :meta private:
- """
- if task_instance.state == TaskInstanceState.RUNNING:
- return task_instance.try_number
- return task_instance.try_number - 1
-
-
-def _set_try_number(*, task_instance: TaskInstance | TaskInstancePydantic, value: int) -> None:
- """
- Set a task try number.
-
- :param task_instance: the task instance
- :param value: the try number
-
- :meta private:
- """
- task_instance._try_number = value # type: ignore[union-attr]
-
-
def _refresh_from_task(
*, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, pool_override: str | None = None
) -> None:
@@ -1164,13 +1117,10 @@
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)
- # This function is called after changing the state from RUNNING,
- # so we need to subtract 1 from self.try_number here.
- current_try_number = task_instance.try_number - 1
additional_context: dict[str, Any] = {
"exception": exception,
"exception_html": exception_html,
- "try_number": current_try_number,
+ "try_number": task_instance.try_number,
"max_tries": task_instance.max_tries,
}
@@ -1343,7 +1293,7 @@
end_date = Column(UtcDateTime)
duration = Column(Float)
state = Column(String(20))
- _try_number = Column("try_number", Integer, default=0)
+ try_number = Column(Integer, default=0)
max_tries = Column(Integer, server_default=text("-1"))
hostname = Column(String(1000))
unixname = Column(String(1000))
@@ -1509,6 +1459,26 @@
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
@property
+ @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
+ def _try_number(self):
+ """
+ Do not use. For semblance of backcompat.
+
+ :meta private:
+ """
+ return self.try_number
+
+ @_try_number.setter
+ @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
+ def _try_number(self, val):
+ """
+ Do not use. For semblance of backcompat.
+
+ :meta private:
+ """
+ self.try_number = val
+
+ @property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
return _stats_tags(task_instance=self)
@@ -1527,7 +1497,7 @@
"dag_id": task.dag_id,
"task_id": task.task_id,
"run_id": run_id,
- "_try_number": 0,
+ "try_number": 0,
"hostname": "",
"unixname": getuser(),
"queue": task.queue,
@@ -1549,53 +1519,22 @@
"""Initialize the attributes that aren't stored in the DB."""
self.test_mode = False # can be changed when calling 'run'
- @hybrid_property
- def try_number(self):
+ @property
+ @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
+ def prev_attempted_tries(self) -> int:
"""
- Return the try number that a task number will be when it is actually run.
+ Calculate the total number of attempted tries, defaulting to 0.
- If the TaskInstance is currently running, this will match the column in the
- database, in all other cases this will be incremented.
-
- This is designed so that task logs end up in the right file.
- """
- return _get_try_number(task_instance=self)
-
- @try_number.expression
- def try_number(cls):
- """
- Return the expression to be used by SQLAlchemy when filtering on try_number.
-
- This is required because the override in the get_try_number function causes
- try_number values to be off by one when listing tasks in the UI.
+ This used to be necessary because try_number did not always tell the truth.
:meta private:
"""
- return cls._try_number
-
- @try_number.setter
- def try_number(self, value: int) -> None:
- """
- Set a task try number.
-
- :param value: the try number
- """
- _set_try_number(task_instance=self, value=value)
-
- @property
- def prev_attempted_tries(self) -> int:
- """
- Calculate the number of previously attempted tries, defaulting to 0.
-
- Expose this for the Task Tries and Gantt graph views.
- Using `try_number` throws off the counts for non-running tasks.
- Also useful in error logging contexts to get the try number for the last try that was attempted.
- """
- return self._try_number
+ return self.try_number
@property
def next_try_number(self) -> int:
- return self._try_number + 1
+ # todo (dstandish): deprecate this property; we don't need a property that is just + 1
+ return self.try_number + 1
@property
def operator_name(self) -> str | None:
@@ -2178,7 +2117,9 @@
# If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,
# we must round up prior to converting to an int, otherwise a divide by zero error
# will occur in the modded_hash calculation.
- min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2)))
+ # this probably gives unexpected results if a task instance has previously been cleared,
+ # because try_number can increase without bound
+ min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1)))
# In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1.
# To address this, we impose a lower bound of 1 on min_backoff. This effectively makes
@@ -2372,7 +2313,6 @@
cls.logger().info("Resuming after deferral")
else:
cls.logger().info("Starting attempt %s of %s", ti.try_number, ti.max_tries + 1)
- ti._try_number += 1
if not test_mode:
session.add(Log(TaskInstanceState.RUNNING.value, ti))
@@ -2791,9 +2731,6 @@
self.next_method = defer.method_name
self.next_kwargs = defer.kwargs or {}
- # Decrement try number so the next one is the same try
- self._try_number -= 1
-
# Calculate timeout too if it was passed
if defer.timeout is not None:
self.trigger_timeout = timezone.utcnow() + defer.timeout
@@ -2910,7 +2847,7 @@
self.task_id,
self.dag_id,
self.run_id,
- self._try_number,
+ self.try_number,
actual_start_date,
self.end_date,
reschedule_exception.reschedule_date,
@@ -2921,10 +2858,6 @@
# set state
self.state = TaskInstanceState.UP_FOR_RESCHEDULE
- # Decrement try_number so subsequent runs will use the same try number and write
- # to same log file.
- self._try_number -= 1
-
self.clear_next_method_args()
session.merge(self)
@@ -3040,7 +2973,6 @@
# e.g. we could make refresh_from_db return a TI and replace ti with that
raise RuntimeError("Expected TaskInstance here. Further AIP-44 work required.")
# We increase the try_number to fail the task if it fails to start after sometime
- ti._try_number += 1
ti.state = State.UP_FOR_RETRY
email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None
diff --git a/airflow/models/taskinstancekey.py b/airflow/models/taskinstancekey.py
index 50906e4..b705ecb 100644
--- a/airflow/models/taskinstancekey.py
+++ b/airflow/models/taskinstancekey.py
@@ -37,6 +37,7 @@
@property
def reduced(self) -> TaskInstanceKey:
"""Remake the key by subtracting 1 from try number to match in memory information."""
+ # todo (dstandish): remove this property
return TaskInstanceKey(
self.dag_id, self.task_id, self.run_id, max(1, self.try_number - 1), self.map_index
)
diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index c5e7e3d..ca7047b 100644
--- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -527,7 +527,7 @@
ti.queue,
ti.command_as_list(),
ti.executor_config,
- ti.prev_attempted_tries,
+ ti.try_number,
)
adopted_tis.append(ti)
diff --git a/airflow/providers/dbt/cloud/CHANGELOG.rst b/airflow/providers/dbt/cloud/CHANGELOG.rst
index cae2a00..bd5e64d 100644
--- a/airflow/providers/dbt/cloud/CHANGELOG.rst
+++ b/airflow/providers/dbt/cloud/CHANGELOG.rst
@@ -28,6 +28,11 @@
Changelog
---------
+main
+.....
+
+In Airflow 2.10.0, we fix the way try_number works, so that it no longer returns different values depending on task instance state. Importantly, after the task is done, it no longer shows current_try + 1. Thus in 3.8.1 we patch this provider to fix try_number references so they no longer adjust for the old, bad behavior.
+
3.8.0
.....
diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py b/airflow/providers/dbt/cloud/utils/openlineage.py
index 5e4550b..ad50552 100644
--- a/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -21,6 +21,8 @@
from contextlib import suppress
from typing import TYPE_CHECKING
+from airflow import __version__ as airflow_version
+
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
@@ -28,6 +30,16 @@
from airflow.providers.openlineage.extractors.base import OperatorLineage
+def _get_try_number(val):
+ # todo: remove when min airflow version >= 2.10.0
+ from packaging.version import parse
+
+ if parse(parse(airflow_version).base_version) < parse("2.10.0"):
+ return val.try_number - 1
+ else:
+ return val.try_number
+
+
def generate_openlineage_events_from_dbt_cloud_run(
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance
) -> OperatorLineage:
@@ -131,7 +143,7 @@
dag_id=task_instance.dag_id,
task_id=operator.task_id,
execution_date=task_instance.execution_date,
- try_number=task_instance.try_number - 1,
+ try_number=_get_try_number(task_instance),
)
parent_job = ParentRunMetadata(
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index cfc7b49..82cc887 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -108,7 +108,7 @@
.one_or_none()
)
if isinstance(val, TaskInstance):
- val._try_number = ti.try_number
+ val.try_number = ti.try_number
return val
else:
raise AirflowException(f"Could not find TaskInstance for {ti}")
diff --git a/airflow/providers/openlineage/CHANGELOG.rst b/airflow/providers/openlineage/CHANGELOG.rst
index 4e1f6ff..882c121 100644
--- a/airflow/providers/openlineage/CHANGELOG.rst
+++ b/airflow/providers/openlineage/CHANGELOG.rst
@@ -26,6 +26,11 @@
Changelog
---------
+main
+.....
+
+In Airflow 2.10.0, we fix the way try_number works, so that it no longer returns different values depending on task instance state. Importantly, after the task is done, it no longer shows current_try + 1. Thus in 1.7.2 we patch this provider to fix try_number references so they no longer adjust for the old, bad behavior.
+
1.7.1
.....
diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py
index 25ded6d..03c6005 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -23,6 +23,7 @@
from openlineage.client.serde import Serde
+from airflow import __version__ as airflow_version
from airflow.listeners import hookimpl
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
@@ -45,6 +46,16 @@
_openlineage_listener: OpenLineageListener | None = None
+def _get_try_number_success(val):
+ # todo: remove when min airflow version >= 2.10.0
+ from packaging.version import parse
+
+ if parse(parse(airflow_version).base_version) < parse("2.10.0"):
+ return val.try_number - 1
+ else:
+ return val.try_number
+
+
class OpenLineageListener:
"""OpenLineage listener sends events on task instance and dag run starts, completes and failures."""
@@ -165,7 +176,7 @@
dag_id=dag.dag_id,
task_id=task.task_id,
execution_date=task_instance.execution_date,
- try_number=task_instance.try_number - 1,
+ try_number=_get_try_number_success(task_instance),
)
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 8e13279..5f49d00 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -241,11 +241,11 @@
started_at: datetime.datetime | float
if self.reschedule:
- # If reschedule, use the start date of the first try (first try can be either the very
- # first execution of the task, or the first execution after the task was cleared.)
ti = context["ti"]
max_tries: int = ti.max_tries or 0
retries: int = self.retries or 0
+ # If reschedule, use the start date of the first try (first try can be either the very
+ # first execution of the task, or the first execution after the task was cleared.)
first_try_number = max_tries - retries + 1
start_date = _orig_start_date(
dag_id=ti.dag_id,
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 2a1dfd2..043f59d 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -145,8 +145,7 @@
Will raise exception if no TI is found in the database.
"""
- from airflow.models.taskinstance import TaskInstance, _get_private_try_number
- from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
+ from airflow.models.taskinstance import TaskInstance
if isinstance(ti, TaskInstance):
return ti
@@ -162,10 +161,7 @@
)
if not val:
raise AirflowException(f"Could not find TaskInstance for {ti}")
- if isinstance(ti, TaskInstancePydantic):
- val.try_number = _get_private_try_number(task_instance=ti)
- else: # TaskInstanceKey
- val.try_number = ti.try_number
+ val.try_number = ti.try_number
return val
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 513b453..4235ab5 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -147,7 +147,7 @@
"start_date": group_start_date,
"end_date": group_end_date,
"mapped_states": mapped_states,
- "try_number": get_try_count(parent_instance._try_number, parent_instance.state),
+ "try_number": get_try_count(parent_instance.try_number, parent_instance.state),
"execution_date": parent_instance.execution_date,
}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 682a70c..131b25f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -314,7 +314,7 @@
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.state,
- TaskInstance._try_number,
+ TaskInstance.try_number,
func.min(TaskInstanceNote.content).label("note"),
func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"),
func.min(TaskInstance.queued_dttm).label("queued_dttm"),
@@ -326,7 +326,7 @@
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
)
- .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance._try_number)
+ .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.try_number)
.order_by(TaskInstance.task_id, TaskInstance.run_id)
)
@@ -409,7 +409,7 @@
"queued_dttm": task_instance.queued_dttm,
"start_date": task_instance.start_date,
"end_date": task_instance.end_date,
- "try_number": wwwutils.get_try_count(task_instance._try_number, task_instance.state),
+ "try_number": wwwutils.get_try_count(task_instance.try_number, task_instance.state),
"note": task_instance.note,
}
for task_instance in grouped_tis[item.task_id]
@@ -1687,7 +1687,7 @@
num_logs = 0
if ti is not None:
- num_logs = wwwutils.get_try_count(ti._try_number, ti.state)
+ num_logs = wwwutils.get_try_count(ti.try_number, ti.state)
logs = [""] * num_logs
root = request.args.get("root", "")
return self.render_template(
@@ -1788,7 +1788,7 @@
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
all_ti_attrs = (
# fetching the value of _try_number to be shown under name try_number in UI
- (name, getattr(ti, "_try_number" if name == "try_number" else name))
+ (name, getattr(ti, name))
for name in dir(ti)
if not name.startswith("_") and name not in ti_attrs_to_skip
)
@@ -5196,7 +5196,7 @@
"pool",
"queued_by_job_id",
]
-
+ # todo: don't use prev_attempted_tries; just use try_number
label_columns = {"dag_run.execution_date": "Logical Date", "prev_attempted_tries": "Try Number"}
search_columns = [
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 775217e..2d10cda 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -62,6 +62,7 @@
task_instance = TaskInstance(task=task)
task_instance.dag_run = dag_run
task_instance.dag_id = dag.dag_id
+ task_instance.try_number = 1
task_instance.xcom_push = mock.Mock() # type: ignore
return Context(
dag=dag,
diff --git a/newsfragments/39336.significant.rst b/newsfragments/39336.significant.rst
new file mode 100644
index 0000000..750a180
--- /dev/null
+++ b/newsfragments/39336.significant.rst
@@ -0,0 +1,7 @@
+``try_number`` is no longer incremented during task execution
+
+Previously, the try number (``try_number``) was incremented at the beginning of task execution on the worker. This was problematic for many reasons. For one it meant that the try number was incremented when it was not supposed to, namely when resuming from reschedule or deferral. And it also resulted in the try number being "wrong" when the task had not yet started. The workarounds for these two issues caused a lot of confusion.
+
+Now, instead, the try number for a task run is determined at the time the task is scheduled, and does not change in flight, and it is never decremented. So after the task runs, the observed try number remains the same as it was when the task was running; only when there is a "new try" will the try number be incremented again.
+
+One consequence of this change is, if users were "manually" running tasks (e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``), try number will no longer be incremented. Airflow assumes that tasks are always run after being scheduled by the scheduler, so we do not regard this as a breaking change.
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index e63f2c3..1e7c29c 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -46,7 +46,7 @@
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_pools, clear_db_runs
@@ -651,6 +651,12 @@
task_command.task_clear(args)
+def _set_state_and_try_num(ti, session):
+ ti.state = TaskInstanceState.QUEUED
+ ti.try_number += 1
+ session.commit()
+
+
class TestLogsfromTaskRunCommand:
def setup_method(self) -> None:
self.dag_id = "test_logging_dag"
@@ -668,7 +674,7 @@
dag = DagBag().get_dag(self.dag_id)
data_interval = dag.timetable.infer_manual_data_interval(run_after=self.execution_date)
- dag.create_dagrun(
+ self.dr = dag.create_dagrun(
run_id=self.run_id,
execution_date=self.execution_date,
data_interval=data_interval,
@@ -676,6 +682,9 @@
state=State.RUNNING,
run_type=DagRunType.MANUAL,
)
+ self.tis = self.dr.get_task_instances()
+ assert len(self.tis) == 1
+ self.ti = self.tis[0]
root = self.root_logger = logging.getLogger()
self.root_handlers = root.handlers.copy()
@@ -757,7 +766,7 @@
@pytest.mark.parametrize(
"is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("", "true"), ("", "")]
)
- def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec):
+ def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec, session):
"""
When running task --local as k8s executor pod, all logging should make it to stdout.
Otherwise, all logging after "running TI" is redirected to logs (and the actual log
@@ -770,6 +779,9 @@
"""
import subprocess
+ ti = self.dr.get_task_instances(session=session)[0]
+ _set_state_and_try_num(ti, session) # so that try_number is correct
+
with mock.patch.dict(
"os.environ",
AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
@@ -807,7 +819,9 @@
assert len(lines) == 1
@pytest.mark.skipif(not hasattr(os, "fork"), reason="Forking not available")
- def test_logging_with_run_task(self):
+ def test_logging_with_run_task(self, session):
+ ti = self.dr.get_task_instances(session=session)[0]
+ _set_state_and_try_num(ti, session)
with conf_vars({("core", "dags_folder"): self.dag_path}):
task_command.task_run(self.parser.parse_args(self.task_args))
@@ -852,7 +866,10 @@
session.commit()
@mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False)
- def test_logging_with_run_task_subprocess(self):
+ def test_logging_with_run_task_subprocess(self, session):
+ ti = self.dr.get_task_instances(session=session)[0]
+ _set_state_and_try_num(ti, session)
+
with conf_vars({("core", "dags_folder"): self.dag_path}):
task_command.task_run(self.parser.parse_args(self.task_args))
@@ -874,14 +891,14 @@
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)
- def test_log_file_template_with_run_task(self):
+ def test_log_file_template_with_run_task(self, session):
"""Verify that the taskinstance has the right context for log_filename_template"""
with conf_vars({("core", "dags_folder"): self.dag_path}):
# increment the try_number of the task to be run
with create_session() as session:
ti = session.query(TaskInstance).filter_by(run_id=self.run_id).first()
- ti.try_number = 1
+ ti.try_number = 2
log_file_path = os.path.join(os.path.dirname(self.ti_log_file_path), "attempt=2.log")
diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py
index 10c8867..de78912 100644
--- a/tests/core/test_sentry.py
+++ b/tests/core/test_sentry.py
@@ -38,7 +38,7 @@
DAG_ID = "test_dag"
TASK_ID = "test_task"
OPERATOR = "PythonOperator"
-TRY_NUMBER = 1
+TRY_NUMBER = 0
STATE = State.SUCCESS
TEST_SCOPE = {
"dag_id": DAG_ID,
@@ -149,7 +149,7 @@
sentry.add_tagging(task_instance=task_instance)
with configure_scope() as scope:
for key, value in scope._tags.items():
- assert TEST_SCOPE[key] == value
+ assert value == TEST_SCOPE[key]
@pytest.mark.db_test
@time_machine.travel(CRUMB_DATE)
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 108ee97..5b7bc36 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -202,7 +202,6 @@
)
run_job(job=job, execute_callable=job_runner._execute)
-
expected_execution_order = [
("runme_0", DEFAULT_DATE),
("runme_1", DEFAULT_DATE),
@@ -217,11 +216,15 @@
("run_this_last", DEFAULT_DATE),
("run_this_last", end_date),
]
- assert [
- ((dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1), (State.SUCCESS, None))
+ actual = [(tuple(x), y) for x, y in executor.sorted_tasks]
+ expected = [
+ (
+ (dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1),
+ (State.SUCCESS, None),
+ )
for (task_id, when) in expected_execution_order
- ] == executor.sorted_tasks
-
+ ]
+ assert actual == expected
session = settings.Session()
drs = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date).all()
@@ -907,10 +910,10 @@
dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor(parallelism=16)
- executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=1)] = (
+ executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=0)] = (
State.UP_FOR_RETRY
)
- executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=2)
+ executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=1)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
@@ -952,10 +955,14 @@
runid1 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()}"
runid2 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=2)).isoformat()}"
- # test executor history keeps a list
- history = executor.history
-
- assert [sorted(item[-1].key[1:3] for item in batch) for batch in history] == [
+ actual = []
+ for batch in executor.history:
+ this_batch = []
+ for cmd, idx, queue, ti in batch: # noqa: B007
+ key = ti.key
+ this_batch.append((key.task_id, key.run_id))
+ actual.append(sorted(this_batch))
+ assert actual == [
[
("leave1", runid0),
("leave1", runid1),
@@ -964,9 +971,21 @@
("leave2", runid1),
("leave2", runid2),
],
- [("upstream_level_1", runid0), ("upstream_level_1", runid1), ("upstream_level_1", runid2)],
- [("upstream_level_2", runid0), ("upstream_level_2", runid1), ("upstream_level_2", runid2)],
- [("upstream_level_3", runid0), ("upstream_level_3", runid1), ("upstream_level_3", runid2)],
+ [
+ ("upstream_level_1", runid0),
+ ("upstream_level_1", runid1),
+ ("upstream_level_1", runid2),
+ ],
+ [
+ ("upstream_level_2", runid0),
+ ("upstream_level_2", runid1),
+ ("upstream_level_2", runid2),
+ ],
+ [
+ ("upstream_level_3", runid0),
+ ("upstream_level_3", runid1),
+ ("upstream_level_3", runid2),
+ ],
]
def test_backfill_pooled_tasks(self):
@@ -1525,7 +1544,7 @@
# match what's in the in-memory ti_status.running map. This is the same
# for skipped, failed and retry states.
ti_status.running[ti.key] = ti # Task is queued and marked as running
- ti._try_number += 1 # Try number is increased during ti.run()
+ ti.try_number += 1
ti.set_state(State.SUCCESS, session) # Task finishes with success state
job_runner._update_counters(ti_status=ti_status, session=session) # Update counters
assert len(ti_status.running) == 0
@@ -1538,7 +1557,7 @@
# Test for success when DB try_number is off from in-memory expectations
ti_status.running[ti.key] = ti
- ti._try_number += 2
+ ti.try_number += 2
ti.set_state(State.SUCCESS, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
@@ -1551,7 +1570,7 @@
# Test for skipped
ti_status.running[ti.key] = ti
- ti._try_number += 1
+ ti.try_number += 1
ti.set_state(State.SKIPPED, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
@@ -1564,7 +1583,7 @@
# Test for failed
ti_status.running[ti.key] = ti
- ti._try_number += 1
+ ti.try_number += 1
ti.set_state(State.FAILED, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
@@ -1577,7 +1596,7 @@
# Test for retry
ti_status.running[ti.key] = ti
- ti._try_number += 1
+ ti.try_number += 1
ti.set_state(State.UP_FOR_RETRY, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
@@ -1595,9 +1614,6 @@
# and DB representation of the task try_number the _same_, which is unlike
# the above cases. But this is okay because the in-memory key is used.
ti_status.running[ti.key] = ti # Task queued and marked as running
- # Note: Both the increase and decrease are kept here for context
- ti._try_number += 1 # Try number is increased during ti.run()
- ti._try_number -= 1 # Task is being rescheduled, decrement try_number
ti.set_state(State.UP_FOR_RESCHEDULE, session) # Task finishes with reschedule state
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
@@ -1610,10 +1626,6 @@
# test for none
ti.set_state(State.NONE, session)
- # Setting ti._try_number = 0 brings us to ti.try_number==1
- # so that the in-memory key access will work fine
- ti._try_number = 0
- assert ti.try_number == 1 # see ti.try_number property in taskinstance module
session.merge(ti)
session.commit()
ti_status.running[ti.key] = ti
@@ -1955,20 +1967,20 @@
)
assert ti_status.failed == set()
assert ti_status.succeeded == {
- TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=0),
- TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=1),
- TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=2),
+ TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=0),
+ TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=1),
+ TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=2),
TaskInstanceKey(
- dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=0
+ dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=0
),
TaskInstanceKey(
- dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=1
+ dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=1
),
TaskInstanceKey(
- dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=2
+ dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=2
),
TaskInstanceKey(
- dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=1, map_index=-1
+ dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=0, map_index=-1
),
}
@@ -2096,7 +2108,7 @@
run_job(job=job, execute_callable=job_runner._execute)
ti = dag_run.get_task_instance(task_id=task1.task_id)
- assert ti._try_number == try_number
+ assert ti.try_number == try_number
dag_run.refresh_from_db()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f122f12..491e345 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3180,7 +3180,7 @@
# executing task.
run_with_error(ti, ignore_ti_state=True)
assert ti.state == State.UP_FOR_RETRY
- assert ti.try_number == 2
+ assert ti.try_number == 1
with create_session() as session:
ti.refresh_from_db(lock_for_update=True, session=session)
@@ -3191,6 +3191,7 @@
executor.do_update = True
do_schedule()
ti.refresh_from_db()
+ assert ti.try_number == 1
assert ti.state == State.SUCCESS
def test_retry_handling_job(self):
@@ -3214,8 +3215,6 @@
.filter(TaskInstance.dag_id == dag.dag_id, TaskInstance.task_id == dag_task1.task_id)
.first()
)
- # make sure the counter has increased
- assert ti.try_number == 2
assert ti.state == State.UP_FOR_RETRY
def test_dag_get_active_runs(self, dag_maker):
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 9279c97..df96998 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -152,7 +152,7 @@
# give it more time for the trigger event to write the log.
time.sleep(0.5)
- assert "test_dag/test_run/sensitive_arg_task/-1/1 (ID 1) starting" in caplog.text
+ assert "test_dag/test_run/sensitive_arg_task/-1/0 (ID 1) starting" in caplog.text
assert "some_password" not in caplog.text
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index bce9dc4..26e3dcc 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -25,7 +25,7 @@
from airflow import settings
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
+from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, clear_task_instances
from airflow.models.taskreschedule import TaskReschedule
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor
@@ -68,6 +68,13 @@
ti1.run()
with create_session() as session:
+ # do the incrementing of try_number ordinarily handled by scheduler
+ ti0.try_number += 1
+ ti1.try_number += 1
+ session.merge(ti0)
+ session.merge(ti1)
+ session.commit()
+
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
@@ -79,10 +86,10 @@
ti1.refresh_from_db()
# Next try to run will be try 2
assert ti0.state is None
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
assert ti0.max_tries == 1
assert ti1.state is None
- assert ti1.try_number == 2
+ assert ti1.try_number == 1
assert ti1.max_tries == 3
def test_clear_task_instances_external_executor_id(self, dag_maker):
@@ -279,6 +286,14 @@
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
+ with create_session() as session:
+ # do the incrementing of try_number ordinarily handled by scheduler
+ ti0.try_number += 1
+ ti1.try_number += 1
+ session.merge(ti0)
+ session.merge(ti1)
+ session.commit()
+
ti0.run()
ti1.run()
@@ -298,10 +313,9 @@
# When no task is found, max_tries will be maximum of original max_tries or try_number.
ti0.refresh_from_db()
ti1.refresh_from_db()
- # Next try to run will be try 2
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
assert ti0.max_tries == 1
- assert ti1.try_number == 2
+ assert ti1.try_number == 1
assert ti1.max_tries == 2
def test_clear_task_instances_without_dag(self, dag_maker):
@@ -323,6 +337,14 @@
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
+ with create_session() as session:
+ # do the incrementing of try_number ordinarily handled by scheduler
+ ti0.try_number += 1
+ ti1.try_number += 1
+ session.merge(ti0)
+ session.merge(ti1)
+ session.commit()
+
ti0.run()
ti1.run()
@@ -337,10 +359,9 @@
# When no DAG is found, max_tries will be maximum of original max_tries or try_number.
ti0.refresh_from_db()
ti1.refresh_from_db()
- # Next try to run will be try 2
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
assert ti0.max_tries == 1
- assert ti1.try_number == 2
+ assert ti1.try_number == 1
assert ti1.max_tries == 2
def test_clear_task_instances_without_dag_param(self, dag_maker, session):
@@ -365,6 +386,14 @@
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
+ with create_session() as session:
+ # do the incrementing of try_number ordinarily handled by scheduler
+ ti0.try_number += 1
+ ti1.try_number += 1
+ session.merge(ti0)
+ session.merge(ti1)
+ session.commit()
+
ti0.run(session=session)
ti1.run(session=session)
@@ -377,10 +406,9 @@
ti0.refresh_from_db(session=session)
ti1.refresh_from_db(session=session)
- # Next try to run will be try 2
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
assert ti0.max_tries == 1
- assert ti1.try_number == 2
+ assert ti1.try_number == 1
assert ti1.max_tries == 3
def test_clear_task_instances_in_multiple_dags(self, dag_maker, session):
@@ -418,6 +446,14 @@
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
+ with create_session() as session:
+ # do the incrementing of try_number ordinarily handled by scheduler
+ ti0.try_number += 1
+ ti1.try_number += 1
+ session.merge(ti0)
+ session.merge(ti1)
+ session.commit()
+
ti0.run(session=session)
ti1.run(session=session)
@@ -426,10 +462,9 @@
ti0.refresh_from_db(session=session)
ti1.refresh_from_db(session=session)
- # Next try to run will be try 2
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
assert ti0.max_tries == 1
- assert ti1.try_number == 2
+ assert ti1.try_number == 1
assert ti1.max_tries == 3
def test_clear_task_instances_with_task_reschedule(self, dag_maker):
@@ -451,6 +486,15 @@
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
+
+ with create_session() as session:
+ # do the incrementing of try_number ordinarily handled by scheduler
+ ti0.try_number += 1
+ ti1.try_number += 1
+ session.merge(ti0)
+ session.merge(ti1)
+ session.commit()
+
ti0.run()
ti1.run()
@@ -500,37 +544,35 @@
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
-
+ session.get(TaskInstance, ti0.key.primary).try_number += 1
+ session.commit()
# Next try to run will be try 1
assert ti0.try_number == 1
ti0.run()
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
dag.clear()
ti0.refresh_from_db()
- assert ti0.try_number == 2
+ assert ti0.try_number == 1
assert ti0.state == State.NONE
assert ti0.max_tries == 1
assert ti1.max_tries == 2
- ti1.try_number = 1
- session.merge(ti1)
+ session.get(TaskInstance, ti1.key.primary).try_number += 1
session.commit()
-
- # Next try will be 2
ti1.run()
- assert ti1.try_number == 3
+ assert ti1.try_number == 1
assert ti1.max_tries == 2
dag.clear()
ti0.refresh_from_db()
ti1.refresh_from_db()
- # after clear dag, ti2 should show attempt 3 of 5
- assert ti1.max_tries == 4
- assert ti1.try_number == 3
- # after clear dag, ti1 should show attempt 2 of 2
- assert ti0.try_number == 2
+ # after clear dag, we have 2 remaining tries
+ assert ti1.max_tries == 3
+ assert ti1.try_number == 1
+ # after clear dag, ti0 has no remaining tries
+ assert ti0.try_number == 1
assert ti0.max_tries == 1
def test_dags_clear(self):
@@ -559,9 +601,11 @@
# test clear all dags
for i in range(num_of_dags):
+ session.get(TaskInstance, tis[i].key.primary).try_number += 1
+ session.commit()
tis[i].run()
assert tis[i].state == State.SUCCESS
- assert tis[i].try_number == 2
+ assert tis[i].try_number == 1
assert tis[i].max_tries == 0
DAG.clear_dags(dags)
@@ -569,14 +613,16 @@
for i in range(num_of_dags):
tis[i].refresh_from_db()
assert tis[i].state == State.NONE
- assert tis[i].try_number == 2
+ assert tis[i].try_number == 1
assert tis[i].max_tries == 1
# test dry_run
for i in range(num_of_dags):
+ session.get(TaskInstance, tis[i].key.primary).try_number += 1
+ session.commit()
tis[i].run()
assert tis[i].state == State.SUCCESS
- assert tis[i].try_number == 3
+ assert tis[i].try_number == 2
assert tis[i].max_tries == 1
DAG.clear_dags(dags, dry_run=True)
@@ -584,7 +630,7 @@
for i in range(num_of_dags):
tis[i].refresh_from_db()
assert tis[i].state == State.SUCCESS
- assert tis[i].try_number == 3
+ assert tis[i].try_number == 2
assert tis[i].max_tries == 1
# test only_failed
@@ -599,14 +645,14 @@
ti.refresh_from_db()
if ti is failed_dag:
assert ti.state == State.NONE
- assert ti.try_number == 3
+ assert ti.try_number == 2
assert ti.max_tries == 2
else:
assert ti.state == State.SUCCESS
- assert ti.try_number == 3
+ assert ti.try_number == 2
assert ti.max_tries == 1
- def test_operator_clear(self, dag_maker):
+ def test_operator_clear(self, dag_maker, session):
with dag_maker(
"test_operator_clear",
start_date=DEFAULT_DATE,
@@ -625,18 +671,27 @@
ti1.task = op1
ti2.task = op2
+ session.get(TaskInstance, ti2.key.primary).try_number += 1
+ session.commit()
ti2.run()
# Dependency not met
assert ti2.try_number == 1
assert ti2.max_tries == 1
op2.clear(upstream=True)
+ # max tries will be set to retries + curr try number == 1 + 1 == 2
+ assert session.get(TaskInstance, ti2.key.primary).max_tries == 2
+
+ session.get(TaskInstance, ti1.key.primary).try_number += 1
+ session.commit()
ti1.run()
+ assert ti1.try_number == 1
+
+ session.get(TaskInstance, ti2.key.primary).try_number += 1
+ session.commit()
ti2.run(ignore_ti_state=True)
- assert ti1.try_number == 2
# max_tries is 0 because there is no task instance in db for ti1
# so clear won't change the max_tries.
assert ti1.max_tries == 0
assert ti2.try_number == 2
- # try_number (0) + retries(1)
- assert ti2.max_tries == 1
+ assert ti2.max_tries == 2 # max tries has not changed since it was updated when op2.clear called
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index aa6be10..bb134d8 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3257,7 +3257,13 @@
assert not dag._check_schedule_interval_matches_timetable()
-@pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ("test-run-id", None)])
+@pytest.mark.parametrize(
+ "run_id, execution_date",
+ [
+ (None, datetime_tz(2020, 1, 1)),
+ ("test-run-id", None),
+ ],
+)
def test_set_task_instance_state(run_id, execution_date, session, dag_maker):
"""Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""
@@ -3321,7 +3327,9 @@
# dagrun should be set to QUEUED
assert dagrun.get_state() == State.QUEUED
- assert {t.key for t in altered} == {("test_set_task_instance_state", "task_1", dagrun.run_id, 1, -1)}
+ assert {tuple(t.key) for t in altered} == {
+ ("test_set_task_instance_state", "task_1", dagrun.run_id, 0, -1)
+ }
def test_set_task_instance_state_mapped(dag_maker, session):
@@ -3472,8 +3480,8 @@
assert dagrun.get_state() == State.QUEUED
assert {t.key for t in altered} == {
- ("test_set_task_group_state", "section_1.task_1", dagrun.run_id, 1, -1),
- ("test_set_task_group_state", "section_1.task_3", dagrun.run_id, 1, -1),
+ ("test_set_task_group_state", "section_1.task_1", dagrun.run_id, 0, -1),
+ ("test_set_task_group_state", "section_1.task_3", dagrun.run_id, 0, -1),
}
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 3ab3ccc..fecde3b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -29,7 +29,7 @@
from traceback import format_exception
from typing import cast
from unittest import mock
-from unittest.mock import MagicMock, call, mock_open, patch
+from unittest.mock import call, mock_open, patch
from uuid import uuid4
import pendulum
@@ -66,8 +66,6 @@
TaskInstance,
TaskInstance as TI,
TaskInstanceNote,
- _get_private_try_number,
- _get_try_number,
_run_finished_callback,
)
from airflow.models.taskmap import TaskMap
@@ -623,23 +621,33 @@
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
- assert ti.try_number == 1
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+
# first run -- up for retry
run_with_error(ti)
assert ti.state == State.UP_FOR_RETRY
- assert ti.try_number == 2
+ assert ti.try_number == 1
+
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
# second run -- still up for retry because retry_delay hasn't expired
time_machine.coordinates.shift(3)
run_with_error(ti)
assert ti.state == State.UP_FOR_RETRY
+ assert ti.try_number == 2
+
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
# third run -- failed
time_machine.coordinates.shift(datetime.datetime.resolution)
run_with_error(ti)
assert ti.state == State.FAILED
+ assert ti.try_number == 3
- def test_retry_handling(self, dag_maker):
+ def test_retry_handling(self, dag_maker, session):
"""
Test that task retries are handled properly
"""
@@ -663,36 +671,44 @@
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
- assert ti.try_number == 1
+ assert ti.try_number == 0
+
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+ session.commit()
# first run -- up for retry
run_with_error(ti)
assert ti.state == State.UP_FOR_RETRY
- assert ti._try_number == 1
- assert ti.try_number == 2
+ assert ti.try_number == 1
+
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+ session.commit()
# second run -- fail
run_with_error(ti)
assert ti.state == State.FAILED
- assert ti._try_number == 2
- assert ti.try_number == 3
+ assert ti.try_number == 2
# Clear the TI state since you can't run a task with a FAILED state without
# clearing it first
dag.clear()
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+ session.commit()
+
# third run -- up for retry
run_with_error(ti)
assert ti.state == State.UP_FOR_RETRY
- assert ti._try_number == 3
- assert ti.try_number == 4
+ assert ti.try_number == 3
+
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+ session.commit()
# fourth run -- fail
run_with_error(ti)
ti.refresh_from_db()
assert ti.state == State.FAILED
- assert ti._try_number == 4
- assert ti.try_number == 5
+ assert ti.try_number == 4
assert RenderedTaskInstanceFields.get_templated_fields(ti) == expected_rendered_ti_fields
def test_next_retry_datetime(self, dag_maker):
@@ -783,8 +799,12 @@
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
- assert ti._try_number == 0
- assert ti.try_number == 1
+ assert ti.try_number == 0
+
+ date1 = timezone.utcnow()
+ date2 = date1 + datetime.timedelta(minutes=1)
+ date3 = date2 + datetime.timedelta(minutes=1)
+ date4 = date3 + datetime.timedelta(minutes=1)
def run_ti_and_assert(
run_date,
@@ -796,30 +816,29 @@
expected_task_reschedule_count,
):
with time_machine.travel(run_date, tick=False):
+ exc = None
try:
ti.run()
- except AirflowException:
+ except AirflowException as e:
+ exc = e
if not fail:
raise
+ if exc and not fail:
+ raise RuntimeError("expected to fail")
ti.refresh_from_db()
assert ti.state == expected_state
- assert ti._try_number == expected_try_number
- assert ti.try_number == expected_try_number + 1
+ assert ti.try_number == expected_try_number
assert ti.start_date == expected_start_date
assert ti.end_date == expected_end_date
assert ti.duration == expected_duration
assert len(task_reschedules_for_ti(ti)) == expected_task_reschedule_count
- date1 = timezone.utcnow()
- date2 = date1 + datetime.timedelta(minutes=1)
- date3 = date2 + datetime.timedelta(minutes=1)
- date4 = date3 + datetime.timedelta(minutes=1)
-
# Run with multiple reschedules.
# During reschedule the try number remains the same, but each reschedule is recorded.
# The start date is expected to remain the initial date, hence the duration increases.
- # When finished the try number is incremented and there is no reschedule expected
- # for this try.
+ # When there's a new try (task run following something other than a reschedule), then
+ # the scheduler will increment the try_number. We do that inline here since
+ # we're not using the scheduler.
done, fail = False, False
run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 0, 1)
@@ -831,29 +850,37 @@
run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3)
done, fail = True, False
- run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0)
+ run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 0, 3)
# Clear the task instance.
dag.clear()
ti.refresh_from_db()
assert ti.state == State.NONE
- assert ti._try_number == 1
+ assert ti.try_number == 0
- # Run again after clearing with reschedules and a retry.
- # The retry increments the try number, and for that try no reschedule is expected.
+ # We will run it again with reschedules and a retry.
+
+ # We increment the try number because that's what the scheduler would do
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+
# After the retry the start date is reset, hence the duration is also reset.
done, fail = False, False
run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1)
done, fail = False, True
- run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0)
+ run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1)
+
+ # scheduler would create a new try here
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
done, fail = False, False
run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1)
done, fail = True, False
- run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)
+ run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1)
def test_mapped_reschedule_handling(self, dag_maker, task_reschedules_for_ti):
"""
@@ -880,8 +907,7 @@
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
- assert ti._try_number == 0
- assert ti.try_number == 1
+ assert ti.try_number == 0
def run_ti_and_assert(
run_date,
@@ -901,8 +927,7 @@
raise
ti.refresh_from_db()
assert ti.state == expected_state
- assert ti._try_number == expected_try_number
- assert ti.try_number == expected_try_number + 1
+ assert ti.try_number == expected_try_number
assert ti.start_date == expected_start_date
assert ti.end_date == expected_end_date
assert ti.duration == expected_duration
@@ -929,29 +954,36 @@
run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3)
done, fail = True, False
- run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0)
+ run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 0, 3)
# Clear the task instance.
dag.clear()
ti.refresh_from_db()
assert ti.state == State.NONE
- assert ti._try_number == 1
+ assert ti.try_number == 0
# Run again after clearing with reschedules and a retry.
- # The retry increments the try number, and for that try no reschedule is expected.
+
+ # We increment the try number because that's what the scheduler would do
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+
# After the retry the start date is reset, hence the duration is also reset.
done, fail = False, False
run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1)
done, fail = False, True
- run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0)
+ run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1)
+
+ with create_session() as session:
+ session.get(TaskInstance, ti.key.primary).try_number += 1
done, fail = False, False
run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1)
done, fail = True, False
- run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)
+ run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1)
@pytest.mark.usefixtures("test_pool")
def test_mapped_task_reschedule_handling_clear_reschedules(self, dag_maker, task_reschedules_for_ti):
@@ -978,8 +1010,7 @@
).expand(poke_interval=[0])
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
- assert ti._try_number == 0
- assert ti.try_number == 1
+ assert ti.try_number == 0
def run_ti_and_assert(
run_date,
@@ -999,8 +1030,7 @@
raise
ti.refresh_from_db()
assert ti.state == expected_state
- assert ti._try_number == expected_try_number
- assert ti.try_number == expected_try_number + 1
+ assert ti.try_number == expected_try_number
assert ti.start_date == expected_start_date
assert ti.end_date == expected_end_date
assert ti.duration == expected_duration
@@ -1015,7 +1045,7 @@
dag.clear()
ti.refresh_from_db()
assert ti.state == State.NONE
- assert ti._try_number == 0
+ assert ti.try_number == 0
# Check that reschedules for ti have also been cleared.
assert not task_reschedules_for_ti(ti)
@@ -1045,8 +1075,7 @@
)
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
- assert ti._try_number == 0
- assert ti.try_number == 1
+ assert ti.try_number == 0
def run_ti_and_assert(
run_date,
@@ -1065,8 +1094,7 @@
raise
ti.refresh_from_db()
assert ti.state == expected_state
- assert ti._try_number == expected_try_number
- assert ti.try_number == expected_try_number + 1
+ assert ti.try_number == expected_try_number
assert ti.start_date == expected_start_date
assert ti.end_date == expected_end_date
assert ti.duration == expected_duration
@@ -1081,7 +1109,7 @@
dag.clear()
ti.refresh_from_db()
assert ti.state == State.NONE
- assert ti._try_number == 0
+ assert ti.try_number == 0
# Check that reschedules for ti have also been cleared.
assert not task_reschedules_for_ti(ti)
@@ -1803,12 +1831,12 @@
serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag
ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id)
- assert ti_from_deserialized_task._try_number == 0
+ assert ti_from_deserialized_task.try_number == 0
assert ti_from_deserialized_task.check_and_change_state_before_execution()
# State should be running, and try_number column should be incremented
assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id
assert ti_from_deserialized_task.state == State.RUNNING
- assert ti_from_deserialized_task._try_number == 1
+ assert ti_from_deserialized_task.try_number == 0
def test_check_and_change_state_before_execution_provided_id_overrides(self, create_task_instance):
expected_external_executor_id = "banana"
@@ -1822,14 +1850,14 @@
serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag
ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id)
- assert ti_from_deserialized_task._try_number == 0
+ assert ti_from_deserialized_task.try_number == 0
assert ti_from_deserialized_task.check_and_change_state_before_execution(
external_executor_id=expected_external_executor_id
)
# State should be running, and try_number column should be incremented
assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id
assert ti_from_deserialized_task.state == State.RUNNING
- assert ti_from_deserialized_task._try_number == 1
+ assert ti_from_deserialized_task.try_number == 0
def test_check_and_change_state_before_execution_with_exec_id(self, create_task_instance):
expected_external_executor_id = "minions"
@@ -1840,14 +1868,14 @@
serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag
ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id)
- assert ti_from_deserialized_task._try_number == 0
+ assert ti_from_deserialized_task.try_number == 0
assert ti_from_deserialized_task.check_and_change_state_before_execution(
external_executor_id=expected_external_executor_id
)
- # State should be running, and try_number column should be incremented
+ # State should be running, and try_number column should be unchanged
assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id
assert ti_from_deserialized_task.state == State.RUNNING
- assert ti_from_deserialized_task._try_number == 1
+ assert ti_from_deserialized_task.try_number == 0
def test_check_and_change_state_before_execution_dep_not_met(self, create_task_instance):
ti = create_task_instance(dag_id="test_check_and_change_state_before_execution")
@@ -1895,12 +1923,14 @@
Test the try_number accessor behaves in various running states
"""
ti = create_task_instance(dag_id="test_check_and_change_state_before_execution")
- assert 1 == ti.try_number
+ # TI starts at 0. It's only incremented by the scheduler.
+ assert ti.try_number == 0
ti.try_number = 2
+ assert ti.try_number == 2
ti.state = State.RUNNING
- assert 2 == ti.try_number
+ assert ti.try_number == 2 # unaffected by state
ti.state = State.SUCCESS
- assert 3 == ti.try_number
+ assert ti.try_number == 2 # unaffected by state
def test_get_num_running_task_instances(self, create_task_instance):
session = settings.Session()
@@ -2047,7 +2077,7 @@
assert email == "to"
assert "test_email_alert" in title
assert "test_email_alert" in body
- assert "Try 1" in body
+ assert "Try 0" in body
@conf_vars(
{
@@ -2130,7 +2160,7 @@
(email, title, body), _ = mock_send_email.call_args
assert email == "to"
assert title == f"Airflow alert: <TaskInstance: test_dag.{task_id} test map_index=0 [failed]>"
- assert body.startswith("Try 1")
+ assert body.startswith("Try 0") # try number only incremented by the scheduler
assert "test_email_alert" in body
tf = (
@@ -3071,7 +3101,7 @@
assert "task_instance" in context_arg_3
mock_on_retry_3.assert_not_called()
- def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+ def test_handle_failure_updates_queued_task_updates_state(self, dag_maker):
session = settings.Session()
with dag_maker():
task = EmptyOperator(task_id="mytask", retries=1)
@@ -3081,13 +3111,8 @@
session.merge(ti)
session.flush()
assert ti.state == State.QUEUED
- assert ti.try_number == 1
ti.handle_failure("test queued ti", test_mode=True)
assert ti.state == State.UP_FOR_RETRY
- # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try
- assert ti._try_number == 1
- # Check 'ti.try_number' is bumped to 2. This is try_number for next run
- assert ti.try_number == 2
@patch.object(Stats, "incr")
def test_handle_failure_no_task(self, Stats_incr, dag_maker):
@@ -3100,6 +3125,7 @@
task = EmptyOperator(task_id="mytask", retries=1)
dr = dag_maker.create_dagrun()
ti = TI(task=task, run_id=dr.run_id)
+ ti.try_number += 1
ti = session.merge(ti)
ti.task = None
ti.state = State.QUEUED
@@ -3113,10 +3139,8 @@
ti.handle_failure("test queued ti", test_mode=False)
assert ti.state == State.UP_FOR_RETRY
- # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try
- assert ti._try_number == 1
- # Check 'ti.try_number' is bumped to 2. This is try_number for next run
- assert ti.try_number == 2
+ # try_number remains at 1
+ assert ti.try_number == 1
Stats_incr.assert_any_call("ti_failures", tags=expected_stats_tags)
Stats_incr.assert_any_call("operator_failures_EmptyOperator", tags=expected_stats_tags)
@@ -3466,7 +3490,7 @@
"end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234),
"duration": 1.234,
"state": State.SUCCESS,
- "_try_number": 1,
+ "try_number": 1,
"max_tries": 1,
"hostname": "some_unique_hostname",
"unixname": "some_unique_unixname",
@@ -3492,7 +3516,7 @@
"task_display_name": "Test Refresh from DB Task",
}
# Make sure we aren't missing any new value in our expected_values list.
- expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values}
+ expected_keys = {f"task_instance.{key}" for key in expected_values}
assert {str(c) for c in TI.__table__.columns} == expected_keys, (
"Please add all non-foreign values of TaskInstance to this list. "
"This prevents refresh_from_db() from missing a field."
@@ -4740,20 +4764,11 @@
BashOperator(task_id="hello", bash_command="hi")
dag_maker.create_dagrun(state="success")
ti = session.scalar(select(TaskInstance))
+ session.get(TaskInstance, ti.key.primary).try_number += 1
+ session.commit()
assert ti.task_id == "hello" # just to confirm...
assert ti.try_number == 1 # starts out as 1
ti.refresh_from_db()
assert ti.try_number == 1 # stays 1
ti.refresh_from_db()
assert ti.try_number == 1 # stays 1
-
-
-@pytest.mark.parametrize("state", list(TaskInstanceState))
-def test_get_private_try_number(state: str):
- mock_ti = MagicMock()
- mock_ti.state = state
- private_try_number = 2
- mock_ti._try_number = private_try_number
- mock_ti.try_number = _get_try_number(task_instance=mock_ti)
- delattr(mock_ti, "_try_number")
- assert _get_private_try_number(task_instance=mock_ti) == private_try_number
diff --git a/tests/plugins/priority_weight_strategy.py b/tests/plugins/priority_weight_strategy.py
index c56ae73..a205536 100644
--- a/tests/plugins/priority_weight_strategy.py
+++ b/tests/plugins/priority_weight_strategy.py
@@ -44,7 +44,7 @@
"""A priority weight strategy that decreases the priority weight with each attempt."""
def get_weight(self, ti: TaskInstance):
- return max(3 - ti._try_number + 1, 1)
+ return max(3 - ti.try_number + 1, 1)
class TestPriorityWeightStrategyPlugin(AirflowPlugin):
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 524360d..6c6bef5 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -1179,7 +1179,7 @@
orphaned_tasks[1].external_executor_id = "002" # Matches a running task_arn
orphaned_tasks[2].external_executor_id = None # One orphaned task has no external_executor_id
for task in orphaned_tasks:
- task.prev_attempted_tries = 1
+ task.try_number = 1
not_adopted_tasks = mock_executor.try_adopt_task_instances(orphaned_tasks)
diff --git a/tests/providers/celery/executors/test_celery_executor.py b/tests/providers/celery/executors/test_celery_executor.py
index 4c62a24..e091305 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -200,8 +200,6 @@
def test_try_adopt_task_instances(self):
start_date = timezone.utcnow() - timedelta(days=2)
- try_number = 1
-
with DAG("test_try_adopt_task_instances_none") as dag:
task_1 = BaseOperator(task_id="task_1", start_date=start_date)
task_2 = BaseOperator(task_id="task_2", start_date=start_date)
@@ -221,8 +219,8 @@
not_adopted_tis = executor.try_adopt_task_instances(tis)
- key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, try_number)
- key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, try_number)
+ key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0)
+ key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0)
assert executor.running == {key_1, key_2}
assert executor.tasks == {key_1: AsyncResult("231"), key_2: AsyncResult("232")}
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py
index fe7c941..224824e 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -358,7 +358,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"run_id": "test",
"airflow_kpo_in_cluster": str(in_cluster),
@@ -374,7 +374,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"run_id": "test",
"map_index": "10",
@@ -884,7 +884,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -920,7 +920,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -991,7 +991,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -1061,7 +1061,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -1112,7 +1112,7 @@
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "1",
+ "try_number": "0",
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py b/tests/providers/cncf/kubernetes/test_template_rendering.py
index 0627eb8..f3e6110 100644
--- a/tests/providers/cncf/kubernetes/test_template_rendering.py
+++ b/tests/providers/cncf/kubernetes/test_template_rendering.py
@@ -48,7 +48,7 @@
"dag_id": "test_render_k8s_pod_yaml",
"run_id": "test_run_id",
"task_id": "op1",
- "try_number": "1",
+ "try_number": "0",
},
"labels": {
"airflow-worker": "0",
@@ -57,7 +57,7 @@
"run_id": "test_run_id",
"kubernetes_executor": "True",
"task_id": "op1",
- "try_number": "1",
+ "try_number": "0",
},
"name": mock.ANY,
"namespace": "default",
diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py
index c37892c..fa651de 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -297,12 +297,14 @@
listener.on_task_instance_success(None, task_instance, None)
# This run_id will be different as we did NOT simulate increase of the try_number attribute,
# which happens in Airflow.
- listener.adapter.complete_task.assert_called_once_with(
+ calls = listener.adapter.complete_task.call_args_list
+ assert len(calls) == 1
+ assert calls[0][1] == dict(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="dag_id.task_id.execution_date.0",
+ run_id="dag_id.task_id.execution_date.1",
task=listener.extractor_manager.extract_metadata(),
)
@@ -310,12 +312,14 @@
listener.adapter.complete_task.reset_mock()
task_instance.try_number += 1
listener.on_task_instance_success(None, task_instance, None)
- listener.adapter.complete_task.assert_called_once_with(
+ calls = listener.adapter.complete_task.call_args_list
+ assert len(calls) == 1
+ assert calls[0][1] == dict(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="dag_id.task_id.execution_date.1",
+ run_id="dag_id.task_id.execution_date.2",
task=listener.extractor_manager.extract_metadata(),
)
@@ -334,24 +338,23 @@
listener, task_instance = _create_listener_and_task_instance()
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
-
+ expected_run_id_1 = "dag_id.task_id.execution_date.1"
+ expected_run_id_2 = "dag_id.task_id.execution_date.2"
listener.on_task_instance_running(None, task_instance, None)
- expected_run_id = listener.adapter.start_task.call_args.kwargs["run_id"]
- assert expected_run_id == "dag_id.task_id.execution_date.1"
+ assert listener.adapter.start_task.call_args.kwargs["run_id"] == expected_run_id_1
listener.on_task_instance_failed(None, task_instance, None)
- assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id
+ assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id_1
- # This run_id will be different as we did NOT simulate increase of the try_number attribute,
- # which happens in Airflow.
+ # This run_id will not be different as we did NOT simulate increase of the try_number attribute,
listener.on_task_instance_success(None, task_instance, None)
- assert listener.adapter.complete_task.call_args.kwargs["run_id"] == "dag_id.task_id.execution_date.0"
+ assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_1
# Now we simulate the increase of try_number, and the run_id should reflect that change.
# This is how airflow works, and that's why we expect the run_id to remain constant across all methods.
task_instance.try_number += 1
listener.on_task_instance_success(None, task_instance, None)
- assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id
+ assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_2
def test_running_task_correctly_calls_openlineage_adapter_run_id_method():
@@ -403,7 +406,7 @@
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
- try_number=0,
+ try_number=1,
)
@@ -428,16 +431,16 @@
_, task_instance = _create_test_dag_and_task(fail_callable, "failure")
# try_number before execution
- assert task_instance.try_number == 1
+ assert task_instance.try_number == 0
with suppress(CustomError):
task_instance.run()
# try_number at the moment of function being called
- assert captured_try_numbers["running"] == 1
- assert captured_try_numbers["failed"] == 1
+ assert captured_try_numbers["running"] == 0
+ assert captured_try_numbers["failed"] == 0
# try_number after task has been executed
- assert task_instance.try_number == 2
+ assert task_instance.try_number == 0
@mock.patch("airflow.models.taskinstance.get_listener_manager")
@@ -457,15 +460,15 @@
_, task_instance = _create_test_dag_and_task(success_callable, "success")
# try_number before execution
- assert task_instance.try_number == 1
+ assert task_instance.try_number == 0
task_instance.run()
# try_number at the moment of function being called
- assert captured_try_numbers["running"] == 1
- assert captured_try_numbers["success"] == 2
+ assert captured_try_numbers["running"] == 0
+ assert captured_try_numbers["success"] == 0
# try_number after task has been executed
- assert task_instance.try_number == 2
+ assert task_instance.try_number == 0
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
diff --git a/tests/providers/smtp/notifications/test_smtp.py b/tests/providers/smtp/notifications/test_smtp.py
index f1a71e2..b19cc4b 100644
--- a/tests/providers/smtp/notifications/test_smtp.py
+++ b/tests/providers/smtp/notifications/test_smtp.py
@@ -129,7 +129,7 @@
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="DAG dag - Task op - Run ID test in State None",
- html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Run ID:</td>\n <td>test</td>\n </tr>\n <tr>\n <td>Try:</td>\n <td>1 of 1</td>\n </tr>\n <tr>\n <td>Task State:</td>\n <td>None</td>\n </tr>\n <tr>\n <td>Host:</td>\n <td></td>\n </tr>\n <tr>\n <td>Log Link:</td>\n <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n </tr>\n <tr>\n <td>Mark Success Link:</td>\n <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n </tr>\n \n </table>\n</body>\n</html>""",
+ html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Run ID:</td>\n <td>test</td>\n </tr>\n <tr>\n <td>Try:</td>\n <td>0 of 1</td>\n </tr>\n <tr>\n <td>Task State:</td>\n <td>None</td>\n </tr>\n <tr>\n <td>Host:</td>\n <td></td>\n </tr>\n <tr>\n <td>Log Link:</td>\n <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n </tr>\n <tr>\n <td>Mark Success Link:</td>\n <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n </tr>\n \n </table>\n</body>\n</html>""",
smtp_conn_id="smtp_default",
files=None,
cc=None,
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index a41e683..dbf6288 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -297,7 +297,17 @@
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
+ sensor_ti, dummy_ti = _get_tis()
+ assert dummy_ti.state == State.NONE
+ assert sensor_ti.state == State.NONE
+
+ # ordinarily the scheduler does this
+ sensor_ti.state = State.SCHEDULED
+ sensor_ti.try_number += 1 # first TI run
+ session.commit()
+
self._run(sensor, session=session)
+
sensor_ti, dummy_ti = _get_tis()
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
assert dummy_ti.state == State.NONE
@@ -326,6 +336,9 @@
# first poke returns False and task is re-scheduled
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
+ sensor_ti, dummy_ti = _get_tis()
+ sensor_ti.try_number += 1 # second TI run
+ session.commit()
self._run(sensor)
sensor_ti, dummy_ti = _get_tis()
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
@@ -359,6 +372,9 @@
# first poke returns False and task is re-scheduled
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
+ sensor_ti, dummy_ti = _get_tis()
+ sensor_ti.try_number += 1 # first TI run
+ session.commit()
self._run(sensor)
sensor_ti, dummy_ti = _get_tis()
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
@@ -382,9 +398,12 @@
# Task is cleared
sensor.clear()
sensor_ti, dummy_ti = _get_tis()
- assert sensor_ti.try_number == 2
+ assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
+ sensor_ti, dummy_ti = _get_tis()
+ sensor_ti.try_number += 1 # second TI run
+ session.commit()
# third poke returns False and task is rescheduled again
date3 = date1 + timedelta(seconds=sensor.poke_interval) * 2 + sensor.retry_delay
time_machine.coordinates.shift(sensor.poke_interval + sensor.retry_delay.total_seconds())
@@ -687,9 +706,15 @@
tis = dr.get_task_instances(session=session)
return next(x for x in tis if x.task_id == SENSOR_OP)
+ def _increment_try_number():
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.try_number += 1
+ session.commit()
+
# first poke returns False and task is re-scheduled
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
+ _increment_try_number() # first TI run
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
@@ -701,12 +726,13 @@
with pytest.raises(RuntimeError):
self._run(sensor)
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 2
+ assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RETRY
# third poke returns False and task is rescheduled again
time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1))
+ _increment_try_number() # second TI run
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 2
@@ -718,19 +744,22 @@
with pytest.raises(AirflowSensorTimeout):
self._run(sensor)
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 3
+ assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.FAILED
# Clear the failed sensor
sensor.clear()
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 3
+ # clearing does not change the try_number
+ assert sensor_ti.try_number == 2
+ # but it does change the max_tries
assert sensor_ti.max_tries == 4
assert sensor_ti.state is None
time_machine.coordinates.shift(20)
+ _increment_try_number() # third TI run
for _ in range(3):
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
@@ -744,7 +773,7 @@
with pytest.raises(AirflowSensorTimeout):
self._run(sensor)
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 4
+ assert sensor_ti.try_number == 3
assert sensor_ti.max_tries == 4
assert sensor_ti.state == State.FAILED
@@ -794,13 +823,16 @@
# first poke returns False and task is re-scheduled
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.try_number += 1 # first TI run
self._run(sensor)
+
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
- # second poke raises RuntimeError and task instance is re-scheduled again
+ # second poke raises reschedule exception and task instance is re-scheduled again
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
sensor_ti = _get_sensor_ti()
@@ -821,19 +853,21 @@
with pytest.raises(AirflowSensorTimeout):
self._run(sensor)
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 2
+ assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.FAILED
# Clear the failed sensor
sensor.clear()
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 2
+ assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 3
assert sensor_ti.state == State.NONE
time_machine.coordinates.shift(20)
+ sensor_ti.try_number += 1 # second TI run
+ session.commit()
for _ in range(3):
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
@@ -847,7 +881,7 @@
with pytest.raises(AirflowSensorTimeout):
self._run(sensor)
sensor_ti = _get_sensor_ti()
- assert sensor_ti.try_number == 3
+ assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 3
assert sensor_ti.state == State.FAILED
@@ -983,7 +1017,7 @@
(False, AirflowException),
],
)
- def test_fail_after_resuming_deffered_sensor(self, soft_fail, expected_exception):
+ def test_fail_after_resuming_deferred_sensor(self, soft_fail, expected_exception):
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail)
ti = TaskInstance(task=async_sensor)
ti.next_method = "execute_complete"
diff --git a/tests/test_utils/mock_executor.py b/tests/test_utils/mock_executor.py
index ba555fb..eaf5d32 100644
--- a/tests/test_utils/mock_executor.py
+++ b/tests/test_utils/mock_executor.py
@@ -71,7 +71,6 @@
sorted_queue = sorted(self.queued_tasks.items(), key=sort_by)
for key, (_, _, _, ti) in sorted_queue[:open_slots]:
self.queued_tasks.pop(key)
- ti._try_number += 1
state = self.mock_task_results[key]
ti.set_state(state, session=session)
self.change_state(key, state)
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 0772704..4f7eb94 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -122,7 +122,7 @@
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
- assert log_filename.endswith("1.log"), log_filename
+ assert log_filename.endswith("0.log"), log_filename
ti.run(ignore_ti_state=True)
@@ -161,7 +161,7 @@
python_callable=task_callable,
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
-
+ ti.try_number += 1
logger = ti.log
ti.log.disabled = False
@@ -498,7 +498,7 @@
t.task_instance = ti
h = FileTaskHandler(base_log_folder=os.fspath(tmp_path))
h.set_context(ti)
- expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=1.log"
+ expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=0.log"
if is_a_trigger:
expected += f".trigger.{job.id}.log"
actual = h.handler.baseFilename
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index 3d3248f..e32eb66 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -185,7 +185,7 @@
handler = FileTaskHandler(log_path)
def create_expected_log_file(try_number):
- ti.try_number = try_number - 1
+ ti.try_number = 1
handler.set_context(ti)
handler.emit(logging.makeLogRecord({"msg": "Log for testing."}))
handler.flush()
@@ -271,8 +271,9 @@
in content_disposition
)
assert 200 == response.status_code
- assert "Log for testing." in response.data.decode("utf-8")
- assert "localhost\n" in response.data.decode("utf-8")
+ content = response.data.decode("utf-8")
+ assert "Log for testing." in content
+ assert "localhost\n" in content
DIFFERENT_LOG_FILENAME = "{{ ti.dag_id }}/{{ ti.run_id }}/{{ ti.task_id }}/{{ try_number }}.log"
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 2b893c4..233542e 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1089,7 +1089,7 @@
"task_id": "also_run_this",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},
@@ -1124,7 +1124,7 @@
"task_id": "run_after_loop",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},
@@ -1159,7 +1159,7 @@
"task_id": "run_this_last",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},
@@ -1194,7 +1194,7 @@
"task_id": "runme_0",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},
@@ -1229,7 +1229,7 @@
"task_id": "runme_1",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},
@@ -1264,7 +1264,7 @@
"task_id": "runme_2",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},
@@ -1299,7 +1299,7 @@
"task_id": "this_will_skip",
"trigger_id": None,
"trigger_timeout": None,
- "try_number": 1,
+ "try_number": 0,
"unixname": getuser(),
"updated_at": DEFAULT_DATE.isoformat(),
},