Scheduler to handle incrementing of try_number (#39336)

Previously, there was a lot of bad stuff happening around try_number.

We incremented it when task started running. And because of that, we had this logic to return "_try_number + 1" when task not running. But this gave the "right" try number before it ran, and the wrong number after it ran. And, since it was naively incremented when task starts running -- i.e. without regard to why it is running -- we decremented it when deferring or exiting on a reschedule.

What I do here is try to remove all of that stuff:

no more private _try_number attr
no more getter logic
no more decrementing
no more incrementing as part of task execution
Now what we do is increment only when the task is set to scheduled and only when it's not coming out of deferral or "up_for_reschedule". So the try_number will be more stable. It will not change throughout the course of task execution. The only time it will be incremented is when there's legitimately a new try.

One consequence of this is that try number will no longer be incremented if you run either airlfow tasks run or ti.run() in isolation. But because airflow assumes that all tasks runs are scheduled by the scheduler, I do not regard this to be a breaking change.

If user code or provider code has implemented hacks to get the "right" try_number when looking at it at the wrong time (because previously it gave the wrong answer), unfortunately that code will just have to be patched. There are only two cases I know of in the providers codebase -- openlineage listener, and dbt openlineage.

As a courtesy for backcompat we also add property _try_number which is just a proxy for try_number, so you'll still be able to access this attr. But, it will not behave the same as it did before.

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index a9d8325..76f279b 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -158,10 +158,6 @@
             qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
             tis_altered += session.scalars(qry_sub_dag.with_for_update()).all()
         for task_instance in tis_altered:
-            # The try_number was decremented when setting to up_for_reschedule and deferred.
-            # Increment it back when changing the state again
-            if task_instance.state in (TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RESCHEDULE):
-                task_instance._try_number += 1
             task_instance.set_state(state, session=session)
         session.flush()
     else:
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py
index 4777b8b..f4ea4bd 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -53,7 +53,7 @@
     end_date = auto_field()
     duration = auto_field()
     state = TaskInstanceStateField()
-    _try_number = auto_field(data_key="try_number")
+    try_number = auto_field()
     max_tries = auto_field()
     task_display_name = fields.String(attribute="task_display_name", dump_only=True)
     hostname = auto_field()
diff --git a/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py b/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
index 44e6bad..3335b7d 100644
--- a/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
+++ b/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
@@ -30,7 +30,7 @@
     """A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
 
     def get_weight(self, ti: TaskInstance):
-        return max(3 - ti._try_number + 1, 1)
+        return max(3 - ti.try_number + 1, 1)
 
 
 class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py
index ace6a00..4fd6b24 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -22,7 +22,7 @@
 
 import attr
 import pendulum
-from sqlalchemy import select, tuple_, update
+from sqlalchemy import case, or_, select, tuple_, update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
@@ -245,7 +245,16 @@
             session.execute(
                 update(TI)
                 .where(filter_for_tis)
-                .values(state=TaskInstanceState.SCHEDULED)
+                .values(
+                    state=TaskInstanceState.SCHEDULED,
+                    try_number=case(
+                        (
+                            or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
+                            TI.try_number + 1,
+                        ),
+                        else_=TI.try_number,
+                    ),
+                )
                 .execution_options(synchronize_session=False)
             )
             session.flush()
@@ -425,6 +434,8 @@
         try:
             for ti in dag_run.get_task_instances(session=session):
                 if ti in schedulable_tis:
+                    if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+                        ti.try_number += 1
                     ti.set_state(TaskInstanceState.SCHEDULED)
                 if ti.state != TaskInstanceState.REMOVED:
                     tasks_to_run[ti.key] = ti
@@ -515,6 +526,7 @@
                         if key in ti_status.running:
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
+                        ti.try_number += 1
                         ti.set_state(TaskInstanceState.SCHEDULED, session=session)
                         if ti.dag_run not in ti_status.active_runs:
                             ti_status.active_runs.add(ti.dag_run)
@@ -552,6 +564,14 @@
                     else:
                         self.log.debug("Sending %s to executor", ti)
                         # Skip scheduled state, we are executing immediately
+                        if ti.state in (TaskInstanceState.UP_FOR_RETRY, None):
+                            # i am not sure why this is necessary.
+                            # seemingly a quirk of backfill runner.
+                            # it should be handled elsewhere i think.
+                            # seems the leaf tasks are set SCHEDULED but others not.
+                            # but i am not going to look too closely since we need
+                            # to nuke the current backfill approach anyway.
+                            ti.try_number += 1
                         ti.state = TaskInstanceState.QUEUED
                         ti.queued_by_job_id = self.job.id
                         ti.queued_dttm = timezone.utcnow()
@@ -695,7 +715,9 @@
                 self.log.debug(e)
 
             perform_heartbeat(
-                job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
+                job=self.job,
+                heartbeat_callback=self.heartbeat_callback,
+                only_if_necessary=True,
             )
             # execute the tasks in the queue
             executor.heartbeat()
@@ -725,6 +747,7 @@
                 ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis})
 
                 for new_ti in new_mapped_tis:
+                    new_ti.try_number += 1
                     new_ti.set_state(TaskInstanceState.SCHEDULED, session=session)
 
             # Set state to failed for running TIs that are set up for retry if disable-retry flag is set
@@ -930,7 +953,6 @@
                 "combination. Please adjust backfill dates or wait for this DagRun to finish.",
             )
             return
-        # picklin'
         pickle_id = None
 
         executor_class, _ = ExecutorLoader.import_default_executor_cls()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9db0de4..c9a8424 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2948,6 +2948,8 @@
                 session.expire_all()
                 schedulable_tis, _ = dr.update_state(session=session)
                 for s in schedulable_tis:
+                    if s.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+                        s.try_number += 1
                     s.state = TaskInstanceState.SCHEDULED
                 session.commit()
                 # triggerer may mark tasks scheduled so we read from DB
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 117dd59..84076ee 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -45,7 +45,7 @@
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates
-from sqlalchemy.sql.expression import false, select, true
+from sqlalchemy.sql.expression import case, false, select, true
 
 from airflow import settings
 from airflow.api_internal.internal_api_call import internal_api_call
@@ -1545,7 +1545,8 @@
                 and not ti.task.on_success_callback
                 and not ti.task.outlets
             ):
-                ti._try_number += 1
+                if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+                    ti.try_number += 1
                 ti.defer_task(
                     defer=TaskDeferred(trigger=ti.task.start_trigger, method_name=ti.task.next_method),
                     session=session,
@@ -1567,7 +1568,16 @@
                         TI.run_id == self.run_id,
                         tuple_in_condition((TI.task_id, TI.map_index), schedulable_ti_ids_chunk),
                     )
-                    .values(state=TaskInstanceState.SCHEDULED)
+                    .values(
+                        state=TaskInstanceState.SCHEDULED,
+                        try_number=case(
+                            (
+                                or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
+                                TI.try_number + 1,
+                            ),
+                            else_=TI.try_number,
+                        ),
+                    )
                     .execution_options(synchronize_session=False)
                 ).rowcount
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1a9d1e0..3bc96ec 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -38,6 +38,7 @@
 import jinja2
 import lazy_object_proxy
 import pendulum
+from deprecated import deprecated
 from jinja2 import TemplateAssertionError, UndefinedError
 from sqlalchemy import (
     Column,
@@ -281,14 +282,13 @@
                 ti.refresh_from_task(task)
                 if TYPE_CHECKING:
                     assert ti.task
-                task_retries = task.retries
-                ti.max_tries = ti.try_number + task_retries - 1
+                ti.max_tries = ti.try_number + task.retries
             else:
                 # Ignore errors when updating max_tries if the DAG or
                 # task are not found since database records could be
                 # outdated. We make max_tries the maximum value of its
                 # original max_tries or the last attempted try number.
-                ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
+                ti.max_tries = max(ti.max_tries, ti.try_number)
             ti.state = None
             ti.external_executor_id = None
             ti.clear_next_method_args()
@@ -539,7 +539,7 @@
         task_instance.end_date = ti.end_date
         task_instance.duration = ti.duration
         task_instance.state = ti.state
-        task_instance.try_number = _get_private_try_number(task_instance=ti)
+        task_instance.try_number = ti.try_number
         task_instance.max_tries = ti.max_tries
         task_instance.hostname = ti.hostname
         task_instance.unixname = ti.unixname
@@ -928,53 +928,6 @@
         TaskInstance.save_to_db(failure_context["ti"], session)
 
 
-def _get_try_number(*, task_instance: TaskInstance):
-    """
-    Return the try number that a task number will be when it is actually run.
-
-    If the TaskInstance is currently running, this will match the column in the
-    database, in all other cases this will be incremented.
-
-    This is designed so that task logs end up in the right file.
-
-    :param task_instance: the task instance
-
-    :meta private:
-    """
-    if task_instance.state == TaskInstanceState.RUNNING:
-        return task_instance._try_number
-    return task_instance._try_number + 1
-
-
-def _get_private_try_number(*, task_instance: TaskInstance | TaskInstancePydantic):
-    """
-    Opposite of _get_try_number.
-
-    Given the value returned by try_number, return the value of _try_number that
-    should produce the same result.
-    This is needed for setting _try_number on TaskInstance from the value on PydanticTaskInstance, which has no private attrs.
-
-    :param task_instance: the task instance
-
-    :meta private:
-    """
-    if task_instance.state == TaskInstanceState.RUNNING:
-        return task_instance.try_number
-    return task_instance.try_number - 1
-
-
-def _set_try_number(*, task_instance: TaskInstance | TaskInstancePydantic, value: int) -> None:
-    """
-    Set a task try number.
-
-    :param task_instance: the task instance
-    :param value: the try number
-
-    :meta private:
-    """
-    task_instance._try_number = value  # type: ignore[union-attr]
-
-
 def _refresh_from_task(
     *, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, pool_override: str | None = None
 ) -> None:
@@ -1164,13 +1117,10 @@
         'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
     )
 
-    # This function is called after changing the state from RUNNING,
-    # so we need to subtract 1 from self.try_number here.
-    current_try_number = task_instance.try_number - 1
     additional_context: dict[str, Any] = {
         "exception": exception,
         "exception_html": exception_html,
-        "try_number": current_try_number,
+        "try_number": task_instance.try_number,
         "max_tries": task_instance.max_tries,
     }
 
@@ -1343,7 +1293,7 @@
     end_date = Column(UtcDateTime)
     duration = Column(Float)
     state = Column(String(20))
-    _try_number = Column("try_number", Integer, default=0)
+    try_number = Column(Integer, default=0)
     max_tries = Column(Integer, server_default=text("-1"))
     hostname = Column(String(1000))
     unixname = Column(String(1000))
@@ -1509,6 +1459,26 @@
         return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
 
     @property
+    @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
+    def _try_number(self):
+        """
+        Do not use. For semblance of backcompat.
+
+        :meta private:
+        """
+        return self.try_number
+
+    @_try_number.setter
+    @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
+    def _try_number(self, val):
+        """
+        Do not use. For semblance of backcompat.
+
+        :meta private:
+        """
+        self.try_number = val
+
+    @property
     def stats_tags(self) -> dict[str, str]:
         """Returns task instance tags."""
         return _stats_tags(task_instance=self)
@@ -1527,7 +1497,7 @@
             "dag_id": task.dag_id,
             "task_id": task.task_id,
             "run_id": run_id,
-            "_try_number": 0,
+            "try_number": 0,
             "hostname": "",
             "unixname": getuser(),
             "queue": task.queue,
@@ -1549,53 +1519,22 @@
         """Initialize the attributes that aren't stored in the DB."""
         self.test_mode = False  # can be changed when calling 'run'
 
-    @hybrid_property
-    def try_number(self):
+    @property
+    @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
+    def prev_attempted_tries(self) -> int:
         """
-        Return the try number that a task number will be when it is actually run.
+        Calculate the total number of attempted tries, defaulting to 0.
 
-        If the TaskInstance is currently running, this will match the column in the
-        database, in all other cases this will be incremented.
-
-        This is designed so that task logs end up in the right file.
-        """
-        return _get_try_number(task_instance=self)
-
-    @try_number.expression
-    def try_number(cls):
-        """
-        Return the expression to be used by SQLAlchemy when filtering on try_number.
-
-        This is required because the override in the get_try_number function causes
-        try_number values to be off by one when listing tasks in the UI.
+        This used to be necessary because try_number did not always tell the truth.
 
         :meta private:
         """
-        return cls._try_number
-
-    @try_number.setter
-    def try_number(self, value: int) -> None:
-        """
-        Set a task try number.
-
-        :param value: the try number
-        """
-        _set_try_number(task_instance=self, value=value)
-
-    @property
-    def prev_attempted_tries(self) -> int:
-        """
-        Calculate the number of previously attempted tries, defaulting to 0.
-
-        Expose this for the Task Tries and Gantt graph views.
-        Using `try_number` throws off the counts for non-running tasks.
-        Also useful in error logging contexts to get the try number for the last try that was attempted.
-        """
-        return self._try_number
+        return self.try_number
 
     @property
     def next_try_number(self) -> int:
-        return self._try_number + 1
+        # todo (dstandish): deprecate this property; we don't need a property that is just + 1
+        return self.try_number + 1
 
     @property
     def operator_name(self) -> str | None:
@@ -2178,7 +2117,9 @@
             # If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,
             # we must round up prior to converting to an int, otherwise a divide by zero error
             # will occur in the modded_hash calculation.
-            min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2)))
+            # this probably gives unexpected results if a task instance has previously been cleared,
+            # because try_number can increase without bound
+            min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1)))
 
             # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1.
             # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes
@@ -2372,7 +2313,6 @@
             cls.logger().info("Resuming after deferral")
         else:
             cls.logger().info("Starting attempt %s of %s", ti.try_number, ti.max_tries + 1)
-        ti._try_number += 1
 
         if not test_mode:
             session.add(Log(TaskInstanceState.RUNNING.value, ti))
@@ -2791,9 +2731,6 @@
         self.next_method = defer.method_name
         self.next_kwargs = defer.kwargs or {}
 
-        # Decrement try number so the next one is the same try
-        self._try_number -= 1
-
         # Calculate timeout too if it was passed
         if defer.timeout is not None:
             self.trigger_timeout = timezone.utcnow() + defer.timeout
@@ -2910,7 +2847,7 @@
                 self.task_id,
                 self.dag_id,
                 self.run_id,
-                self._try_number,
+                self.try_number,
                 actual_start_date,
                 self.end_date,
                 reschedule_exception.reschedule_date,
@@ -2921,10 +2858,6 @@
         # set state
         self.state = TaskInstanceState.UP_FOR_RESCHEDULE
 
-        # Decrement try_number so subsequent runs will use the same try number and write
-        # to same log file.
-        self._try_number -= 1
-
         self.clear_next_method_args()
 
         session.merge(self)
@@ -3040,7 +2973,6 @@
                     #  e.g. we could make refresh_from_db return a TI and replace ti with that
                     raise RuntimeError("Expected TaskInstance here. Further AIP-44 work required.")
                 # We increase the try_number to fail the task if it fails to start after sometime
-                ti._try_number += 1
             ti.state = State.UP_FOR_RETRY
             email_for_state = operator.attrgetter("email_on_retry")
             callbacks = task.on_retry_callback if task else None
diff --git a/airflow/models/taskinstancekey.py b/airflow/models/taskinstancekey.py
index 50906e4..b705ecb 100644
--- a/airflow/models/taskinstancekey.py
+++ b/airflow/models/taskinstancekey.py
@@ -37,6 +37,7 @@
     @property
     def reduced(self) -> TaskInstanceKey:
         """Remake the key by subtracting 1 from try number to match in memory information."""
+        # todo (dstandish): remove this property
         return TaskInstanceKey(
             self.dag_id, self.task_id, self.run_id, max(1, self.try_number - 1), self.map_index
         )
diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index c5e7e3d..ca7047b 100644
--- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -527,7 +527,7 @@
                         ti.queue,
                         ti.command_as_list(),
                         ti.executor_config,
-                        ti.prev_attempted_tries,
+                        ti.try_number,
                     )
                     adopted_tis.append(ti)
 
diff --git a/airflow/providers/dbt/cloud/CHANGELOG.rst b/airflow/providers/dbt/cloud/CHANGELOG.rst
index cae2a00..bd5e64d 100644
--- a/airflow/providers/dbt/cloud/CHANGELOG.rst
+++ b/airflow/providers/dbt/cloud/CHANGELOG.rst
@@ -28,6 +28,11 @@
 Changelog
 ---------
 
+main
+.....
+
+In Airflow 2.10.0, we fix the way try_number works, so that it no longer returns different values depending on task instance state.  Importantly, after the task is done, it no longer shows current_try + 1. Thus in 3.8.1 we patch this provider to fix try_number references so they no longer adjust for the old, bad behavior.
+
 3.8.0
 .....
 
diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py b/airflow/providers/dbt/cloud/utils/openlineage.py
index 5e4550b..ad50552 100644
--- a/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -21,6 +21,8 @@
 from contextlib import suppress
 from typing import TYPE_CHECKING
 
+from airflow import __version__ as airflow_version
+
 if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance
     from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
@@ -28,6 +30,16 @@
     from airflow.providers.openlineage.extractors.base import OperatorLineage
 
 
+def _get_try_number(val):
+    # todo: remove when min airflow version >= 2.10.0
+    from packaging.version import parse
+
+    if parse(parse(airflow_version).base_version) < parse("2.10.0"):
+        return val.try_number - 1
+    else:
+        return val.try_number
+
+
 def generate_openlineage_events_from_dbt_cloud_run(
     operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance
 ) -> OperatorLineage:
@@ -131,7 +143,7 @@
             dag_id=task_instance.dag_id,
             task_id=operator.task_id,
             execution_date=task_instance.execution_date,
-            try_number=task_instance.try_number - 1,
+            try_number=_get_try_number(task_instance),
         )
 
         parent_job = ParentRunMetadata(
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index cfc7b49..82cc887 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -108,7 +108,7 @@
         .one_or_none()
     )
     if isinstance(val, TaskInstance):
-        val._try_number = ti.try_number
+        val.try_number = ti.try_number
         return val
     else:
         raise AirflowException(f"Could not find TaskInstance for {ti}")
diff --git a/airflow/providers/openlineage/CHANGELOG.rst b/airflow/providers/openlineage/CHANGELOG.rst
index 4e1f6ff..882c121 100644
--- a/airflow/providers/openlineage/CHANGELOG.rst
+++ b/airflow/providers/openlineage/CHANGELOG.rst
@@ -26,6 +26,11 @@
 Changelog
 ---------
 
+main
+.....
+
+In Airflow 2.10.0, we fix the way try_number works, so that it no longer returns different values depending on task instance state.  Importantly, after the task is done, it no longer shows current_try + 1. Thus in 1.7.2 we patch this provider to fix try_number references so they no longer adjust for the old, bad behavior.
+
 1.7.1
 .....
 
diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py
index 25ded6d..03c6005 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -23,6 +23,7 @@
 
 from openlineage.client.serde import Serde
 
+from airflow import __version__ as airflow_version
 from airflow.listeners import hookimpl
 from airflow.providers.openlineage.extractors import ExtractorManager
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
@@ -45,6 +46,16 @@
 _openlineage_listener: OpenLineageListener | None = None
 
 
+def _get_try_number_success(val):
+    # todo: remove when min airflow version >= 2.10.0
+    from packaging.version import parse
+
+    if parse(parse(airflow_version).base_version) < parse("2.10.0"):
+        return val.try_number - 1
+    else:
+        return val.try_number
+
+
 class OpenLineageListener:
     """OpenLineage listener sends events on task instance and dag run starts, completes and failures."""
 
@@ -165,7 +176,7 @@
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
                 execution_date=task_instance.execution_date,
-                try_number=task_instance.try_number - 1,
+                try_number=_get_try_number_success(task_instance),
             )
             event_type = RunState.COMPLETE.value.lower()
             operator_name = task.task_type.lower()
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 8e13279..5f49d00 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -241,11 +241,11 @@
         started_at: datetime.datetime | float
 
         if self.reschedule:
-            # If reschedule, use the start date of the first try (first try can be either the very
-            # first execution of the task, or the first execution after the task was cleared.)
             ti = context["ti"]
             max_tries: int = ti.max_tries or 0
             retries: int = self.retries or 0
+            # If reschedule, use the start date of the first try (first try can be either the very
+            # first execution of the task, or the first execution after the task was cleared.)
             first_try_number = max_tries - retries + 1
             start_date = _orig_start_date(
                 dag_id=ti.dag_id,
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 2a1dfd2..043f59d 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -145,8 +145,7 @@
 
     Will raise exception if no TI is found in the database.
     """
-    from airflow.models.taskinstance import TaskInstance, _get_private_try_number
-    from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
+    from airflow.models.taskinstance import TaskInstance
 
     if isinstance(ti, TaskInstance):
         return ti
@@ -162,10 +161,7 @@
     )
     if not val:
         raise AirflowException(f"Could not find TaskInstance for {ti}")
-    if isinstance(ti, TaskInstancePydantic):
-        val.try_number = _get_private_try_number(task_instance=ti)
-    else:  # TaskInstanceKey
-        val.try_number = ti.try_number
+    val.try_number = ti.try_number
     return val
 
 
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 513b453..4235ab5 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -147,7 +147,7 @@
         "start_date": group_start_date,
         "end_date": group_end_date,
         "mapped_states": mapped_states,
-        "try_number": get_try_count(parent_instance._try_number, parent_instance.state),
+        "try_number": get_try_count(parent_instance.try_number, parent_instance.state),
         "execution_date": parent_instance.execution_date,
     }
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 682a70c..131b25f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -314,7 +314,7 @@
             TaskInstance.task_id,
             TaskInstance.run_id,
             TaskInstance.state,
-            TaskInstance._try_number,
+            TaskInstance.try_number,
             func.min(TaskInstanceNote.content).label("note"),
             func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"),
             func.min(TaskInstance.queued_dttm).label("queued_dttm"),
@@ -326,7 +326,7 @@
             TaskInstance.dag_id == dag.dag_id,
             TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
         )
-        .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance._try_number)
+        .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.try_number)
         .order_by(TaskInstance.task_id, TaskInstance.run_id)
     )
 
@@ -409,7 +409,7 @@
                         "queued_dttm": task_instance.queued_dttm,
                         "start_date": task_instance.start_date,
                         "end_date": task_instance.end_date,
-                        "try_number": wwwutils.get_try_count(task_instance._try_number, task_instance.state),
+                        "try_number": wwwutils.get_try_count(task_instance.try_number, task_instance.state),
                         "note": task_instance.note,
                     }
                     for task_instance in grouped_tis[item.task_id]
@@ -1687,7 +1687,7 @@
 
         num_logs = 0
         if ti is not None:
-            num_logs = wwwutils.get_try_count(ti._try_number, ti.state)
+            num_logs = wwwutils.get_try_count(ti.try_number, ti.state)
         logs = [""] * num_logs
         root = request.args.get("root", "")
         return self.render_template(
@@ -1788,7 +1788,7 @@
                 warnings.simplefilter("ignore", RemovedInAirflow3Warning)
                 all_ti_attrs = (
                     # fetching the value of _try_number to be shown under name try_number in UI
-                    (name, getattr(ti, "_try_number" if name == "try_number" else name))
+                    (name, getattr(ti, name))
                     for name in dir(ti)
                     if not name.startswith("_") and name not in ti_attrs_to_skip
                 )
@@ -5196,7 +5196,7 @@
         "pool",
         "queued_by_job_id",
     ]
-
+    # todo: don't use prev_attempted_tries; just use try_number
     label_columns = {"dag_run.execution_date": "Logical Date", "prev_attempted_tries": "Try Number"}
 
     search_columns = [
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 775217e..2d10cda 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -62,6 +62,7 @@
     task_instance = TaskInstance(task=task)
     task_instance.dag_run = dag_run
     task_instance.dag_id = dag.dag_id
+    task_instance.try_number = 1
     task_instance.xcom_push = mock.Mock()  # type: ignore
     return Context(
         dag=dag,
diff --git a/newsfragments/39336.significant.rst b/newsfragments/39336.significant.rst
new file mode 100644
index 0000000..750a180
--- /dev/null
+++ b/newsfragments/39336.significant.rst
@@ -0,0 +1,7 @@
+``try_number`` is no longer incremented during task execution
+
+Previously, the try number (``try_number``) was incremented at the beginning of task execution on the worker. This was problematic for many reasons. For one it meant that the try number was incremented when it was not supposed to, namely when resuming from reschedule or deferral. And it also resulted in the try number being "wrong" when the task had not yet started. The workarounds for these two issues caused a lot of confusion.
+
+Now, instead, the try number for a task run is determined at the time the task is scheduled, and does not change in flight, and it is never decremented. So after the task runs, the observed try number remains the same as it was when the task was running; only when there is a "new try" will the try number be incremented again.
+
+One consequence of this change is, if users were "manually" running tasks (e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``), try number will no longer be incremented. Airflow assumes that tasks are always run after being scheduled by the scheduler, so we do not regard this as a breaking change.
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index e63f2c3..1e7c29c 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -46,7 +46,7 @@
 from airflow.operators.bash import BashOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.types import DagRunType
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_pools, clear_db_runs
@@ -651,6 +651,12 @@
         task_command.task_clear(args)
 
 
+def _set_state_and_try_num(ti, session):
+    ti.state = TaskInstanceState.QUEUED
+    ti.try_number += 1
+    session.commit()
+
+
 class TestLogsfromTaskRunCommand:
     def setup_method(self) -> None:
         self.dag_id = "test_logging_dag"
@@ -668,7 +674,7 @@
 
         dag = DagBag().get_dag(self.dag_id)
         data_interval = dag.timetable.infer_manual_data_interval(run_after=self.execution_date)
-        dag.create_dagrun(
+        self.dr = dag.create_dagrun(
             run_id=self.run_id,
             execution_date=self.execution_date,
             data_interval=data_interval,
@@ -676,6 +682,9 @@
             state=State.RUNNING,
             run_type=DagRunType.MANUAL,
         )
+        self.tis = self.dr.get_task_instances()
+        assert len(self.tis) == 1
+        self.ti = self.tis[0]
 
         root = self.root_logger = logging.getLogger()
         self.root_handlers = root.handlers.copy()
@@ -757,7 +766,7 @@
     @pytest.mark.parametrize(
         "is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("", "true"), ("", "")]
     )
-    def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec):
+    def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec, session):
         """
         When running task --local as k8s executor pod, all logging should make it to stdout.
         Otherwise, all logging after "running TI" is redirected to logs (and the actual log
@@ -770,6 +779,9 @@
         """
         import subprocess
 
+        ti = self.dr.get_task_instances(session=session)[0]
+        _set_state_and_try_num(ti, session)  # so that try_number is correct
+
         with mock.patch.dict(
             "os.environ",
             AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
@@ -807,7 +819,9 @@
             assert len(lines) == 1
 
     @pytest.mark.skipif(not hasattr(os, "fork"), reason="Forking not available")
-    def test_logging_with_run_task(self):
+    def test_logging_with_run_task(self, session):
+        ti = self.dr.get_task_instances(session=session)[0]
+        _set_state_and_try_num(ti, session)
         with conf_vars({("core", "dags_folder"): self.dag_path}):
             task_command.task_run(self.parser.parse_args(self.task_args))
 
@@ -852,7 +866,10 @@
             session.commit()
 
     @mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False)
-    def test_logging_with_run_task_subprocess(self):
+    def test_logging_with_run_task_subprocess(self, session):
+        ti = self.dr.get_task_instances(session=session)[0]
+        _set_state_and_try_num(ti, session)
+
         with conf_vars({("core", "dags_folder"): self.dag_path}):
             task_command.task_run(self.parser.parse_args(self.task_args))
 
@@ -874,14 +891,14 @@
             f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
         )
 
-    def test_log_file_template_with_run_task(self):
+    def test_log_file_template_with_run_task(self, session):
         """Verify that the taskinstance has the right context for log_filename_template"""
 
         with conf_vars({("core", "dags_folder"): self.dag_path}):
             # increment the try_number of the task to be run
             with create_session() as session:
                 ti = session.query(TaskInstance).filter_by(run_id=self.run_id).first()
-                ti.try_number = 1
+                ti.try_number = 2
 
             log_file_path = os.path.join(os.path.dirname(self.ti_log_file_path), "attempt=2.log")
 
diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py
index 10c8867..de78912 100644
--- a/tests/core/test_sentry.py
+++ b/tests/core/test_sentry.py
@@ -38,7 +38,7 @@
 DAG_ID = "test_dag"
 TASK_ID = "test_task"
 OPERATOR = "PythonOperator"
-TRY_NUMBER = 1
+TRY_NUMBER = 0
 STATE = State.SUCCESS
 TEST_SCOPE = {
     "dag_id": DAG_ID,
@@ -149,7 +149,7 @@
         sentry.add_tagging(task_instance=task_instance)
         with configure_scope() as scope:
             for key, value in scope._tags.items():
-                assert TEST_SCOPE[key] == value
+                assert value == TEST_SCOPE[key]
 
     @pytest.mark.db_test
     @time_machine.travel(CRUMB_DATE)
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 108ee97..5b7bc36 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -202,7 +202,6 @@
         )
 
         run_job(job=job, execute_callable=job_runner._execute)
-
         expected_execution_order = [
             ("runme_0", DEFAULT_DATE),
             ("runme_1", DEFAULT_DATE),
@@ -217,11 +216,15 @@
             ("run_this_last", DEFAULT_DATE),
             ("run_this_last", end_date),
         ]
-        assert [
-            ((dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1), (State.SUCCESS, None))
+        actual = [(tuple(x), y) for x, y in executor.sorted_tasks]
+        expected = [
+            (
+                (dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1),
+                (State.SUCCESS, None),
+            )
             for (task_id, when) in expected_execution_order
-        ] == executor.sorted_tasks
-
+        ]
+        assert actual == expected
         session = settings.Session()
         drs = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date).all()
 
@@ -907,10 +910,10 @@
         dr = dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor(parallelism=16)
-        executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=1)] = (
+        executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=0)] = (
             State.UP_FOR_RETRY
         )
-        executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=2)
+        executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=1)
         job = Job(executor=executor)
         job_runner = BackfillJobRunner(
             job=job,
@@ -952,10 +955,14 @@
         runid1 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()}"
         runid2 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=2)).isoformat()}"
 
-        # test executor history keeps a list
-        history = executor.history
-
-        assert [sorted(item[-1].key[1:3] for item in batch) for batch in history] == [
+        actual = []
+        for batch in executor.history:
+            this_batch = []
+            for cmd, idx, queue, ti in batch:  # noqa: B007
+                key = ti.key
+                this_batch.append((key.task_id, key.run_id))
+            actual.append(sorted(this_batch))
+        assert actual == [
             [
                 ("leave1", runid0),
                 ("leave1", runid1),
@@ -964,9 +971,21 @@
                 ("leave2", runid1),
                 ("leave2", runid2),
             ],
-            [("upstream_level_1", runid0), ("upstream_level_1", runid1), ("upstream_level_1", runid2)],
-            [("upstream_level_2", runid0), ("upstream_level_2", runid1), ("upstream_level_2", runid2)],
-            [("upstream_level_3", runid0), ("upstream_level_3", runid1), ("upstream_level_3", runid2)],
+            [
+                ("upstream_level_1", runid0),
+                ("upstream_level_1", runid1),
+                ("upstream_level_1", runid2),
+            ],
+            [
+                ("upstream_level_2", runid0),
+                ("upstream_level_2", runid1),
+                ("upstream_level_2", runid2),
+            ],
+            [
+                ("upstream_level_3", runid0),
+                ("upstream_level_3", runid1),
+                ("upstream_level_3", runid2),
+            ],
         ]
 
     def test_backfill_pooled_tasks(self):
@@ -1525,7 +1544,7 @@
         # match what's in the in-memory ti_status.running map. This is the same
         # for skipped, failed and retry states.
         ti_status.running[ti.key] = ti  # Task is queued and marked as running
-        ti._try_number += 1  # Try number is increased during ti.run()
+        ti.try_number += 1
         ti.set_state(State.SUCCESS, session)  # Task finishes with success state
         job_runner._update_counters(ti_status=ti_status, session=session)  # Update counters
         assert len(ti_status.running) == 0
@@ -1538,7 +1557,7 @@
 
         # Test for success when DB try_number is off from in-memory expectations
         ti_status.running[ti.key] = ti
-        ti._try_number += 2
+        ti.try_number += 2
         ti.set_state(State.SUCCESS, session)
         job_runner._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
@@ -1551,7 +1570,7 @@
 
         # Test for skipped
         ti_status.running[ti.key] = ti
-        ti._try_number += 1
+        ti.try_number += 1
         ti.set_state(State.SKIPPED, session)
         job_runner._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
@@ -1564,7 +1583,7 @@
 
         # Test for failed
         ti_status.running[ti.key] = ti
-        ti._try_number += 1
+        ti.try_number += 1
         ti.set_state(State.FAILED, session)
         job_runner._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
@@ -1577,7 +1596,7 @@
 
         # Test for retry
         ti_status.running[ti.key] = ti
-        ti._try_number += 1
+        ti.try_number += 1
         ti.set_state(State.UP_FOR_RETRY, session)
         job_runner._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
@@ -1595,9 +1614,6 @@
         # and DB representation of the task try_number the _same_, which is unlike
         # the above cases. But this is okay because the in-memory key is used.
         ti_status.running[ti.key] = ti  # Task queued and marked as running
-        # Note: Both the increase and decrease are kept here for context
-        ti._try_number += 1  # Try number is increased during ti.run()
-        ti._try_number -= 1  # Task is being rescheduled, decrement try_number
         ti.set_state(State.UP_FOR_RESCHEDULE, session)  # Task finishes with reschedule state
         job_runner._update_counters(ti_status=ti_status, session=session)
         assert len(ti_status.running) == 0
@@ -1610,10 +1626,6 @@
 
         # test for none
         ti.set_state(State.NONE, session)
-        # Setting ti._try_number = 0 brings us to ti.try_number==1
-        # so that the in-memory key access will work fine
-        ti._try_number = 0
-        assert ti.try_number == 1  # see ti.try_number property in taskinstance module
         session.merge(ti)
         session.commit()
         ti_status.running[ti.key] = ti
@@ -1955,20 +1967,20 @@
             )
         assert ti_status.failed == set()
         assert ti_status.succeeded == {
-            TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=0),
-            TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=1),
-            TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=2),
+            TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=0),
+            TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=1),
+            TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=2),
             TaskInstanceKey(
-                dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=0
+                dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=0
             ),
             TaskInstanceKey(
-                dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=1
+                dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=1
             ),
             TaskInstanceKey(
-                dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=2
+                dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=2
             ),
             TaskInstanceKey(
-                dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=1, map_index=-1
+                dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=0, map_index=-1
             ),
         }
 
@@ -2096,7 +2108,7 @@
             run_job(job=job, execute_callable=job_runner._execute)
         ti = dag_run.get_task_instance(task_id=task1.task_id)
 
-        assert ti._try_number == try_number
+        assert ti.try_number == try_number
 
         dag_run.refresh_from_db()
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f122f12..491e345 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3180,7 +3180,7 @@
         # executing task.
         run_with_error(ti, ignore_ti_state=True)
         assert ti.state == State.UP_FOR_RETRY
-        assert ti.try_number == 2
+        assert ti.try_number == 1
 
         with create_session() as session:
             ti.refresh_from_db(lock_for_update=True, session=session)
@@ -3191,6 +3191,7 @@
         executor.do_update = True
         do_schedule()
         ti.refresh_from_db()
+        assert ti.try_number == 1
         assert ti.state == State.SUCCESS
 
     def test_retry_handling_job(self):
@@ -3214,8 +3215,6 @@
             .filter(TaskInstance.dag_id == dag.dag_id, TaskInstance.task_id == dag_task1.task_id)
             .first()
         )
-        # make sure the counter has increased
-        assert ti.try_number == 2
         assert ti.state == State.UP_FOR_RETRY
 
     def test_dag_get_active_runs(self, dag_maker):
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 9279c97..df96998 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -152,7 +152,7 @@
     # give it more time for the trigger event to write the log.
     time.sleep(0.5)
 
-    assert "test_dag/test_run/sensitive_arg_task/-1/1 (ID 1) starting" in caplog.text
+    assert "test_dag/test_run/sensitive_arg_task/-1/0 (ID 1) starting" in caplog.text
     assert "some_password" not in caplog.text
 
 
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index bce9dc4..26e3dcc 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -25,7 +25,7 @@
 from airflow import settings
 from airflow.models.dag import DAG
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
+from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, clear_task_instances
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.python import PythonSensor
@@ -68,6 +68,13 @@
         ti1.run()
 
         with create_session() as session:
+            # do the incrementing of try_number ordinarily handled by scheduler
+            ti0.try_number += 1
+            ti1.try_number += 1
+            session.merge(ti0)
+            session.merge(ti1)
+            session.commit()
+
             # we use order_by(task_id) here because for the test DAG structure of ours
             # this is equivalent to topological sort. It would not work in general case
             # but it works for our case because we specifically constructed test DAGS
@@ -79,10 +86,10 @@
         ti1.refresh_from_db()
         # Next try to run will be try 2
         assert ti0.state is None
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         assert ti0.max_tries == 1
         assert ti1.state is None
-        assert ti1.try_number == 2
+        assert ti1.try_number == 1
         assert ti1.max_tries == 3
 
     def test_clear_task_instances_external_executor_id(self, dag_maker):
@@ -279,6 +286,14 @@
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
+        with create_session() as session:
+            # do the incrementing of try_number ordinarily handled by scheduler
+            ti0.try_number += 1
+            ti1.try_number += 1
+            session.merge(ti0)
+            session.merge(ti1)
+            session.commit()
+
         ti0.run()
         ti1.run()
 
@@ -298,10 +313,9 @@
         # When no task is found, max_tries will be maximum of original max_tries or try_number.
         ti0.refresh_from_db()
         ti1.refresh_from_db()
-        # Next try to run will be try 2
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         assert ti0.max_tries == 1
-        assert ti1.try_number == 2
+        assert ti1.try_number == 1
         assert ti1.max_tries == 2
 
     def test_clear_task_instances_without_dag(self, dag_maker):
@@ -323,6 +337,14 @@
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
+        with create_session() as session:
+            # do the incrementing of try_number ordinarily handled by scheduler
+            ti0.try_number += 1
+            ti1.try_number += 1
+            session.merge(ti0)
+            session.merge(ti1)
+            session.commit()
+
         ti0.run()
         ti1.run()
 
@@ -337,10 +359,9 @@
         # When no DAG is found, max_tries will be maximum of original max_tries or try_number.
         ti0.refresh_from_db()
         ti1.refresh_from_db()
-        # Next try to run will be try 2
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         assert ti0.max_tries == 1
-        assert ti1.try_number == 2
+        assert ti1.try_number == 1
         assert ti1.max_tries == 2
 
     def test_clear_task_instances_without_dag_param(self, dag_maker, session):
@@ -365,6 +386,14 @@
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
+        with create_session() as session:
+            # do the incrementing of try_number ordinarily handled by scheduler
+            ti0.try_number += 1
+            ti1.try_number += 1
+            session.merge(ti0)
+            session.merge(ti1)
+            session.commit()
+
         ti0.run(session=session)
         ti1.run(session=session)
 
@@ -377,10 +406,9 @@
 
         ti0.refresh_from_db(session=session)
         ti1.refresh_from_db(session=session)
-        # Next try to run will be try 2
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         assert ti0.max_tries == 1
-        assert ti1.try_number == 2
+        assert ti1.try_number == 1
         assert ti1.max_tries == 3
 
     def test_clear_task_instances_in_multiple_dags(self, dag_maker, session):
@@ -418,6 +446,14 @@
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
+        with create_session() as session:
+            # do the incrementing of try_number ordinarily handled by scheduler
+            ti0.try_number += 1
+            ti1.try_number += 1
+            session.merge(ti0)
+            session.merge(ti1)
+            session.commit()
+
         ti0.run(session=session)
         ti1.run(session=session)
 
@@ -426,10 +462,9 @@
 
         ti0.refresh_from_db(session=session)
         ti1.refresh_from_db(session=session)
-        # Next try to run will be try 2
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         assert ti0.max_tries == 1
-        assert ti1.try_number == 2
+        assert ti1.try_number == 1
         assert ti1.max_tries == 3
 
     def test_clear_task_instances_with_task_reschedule(self, dag_maker):
@@ -451,6 +486,15 @@
         ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
+
+        with create_session() as session:
+            # do the incrementing of try_number ordinarily handled by scheduler
+            ti0.try_number += 1
+            ti1.try_number += 1
+            session.merge(ti0)
+            session.merge(ti1)
+            session.commit()
+
         ti0.run()
         ti1.run()
 
@@ -500,37 +544,35 @@
         ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
-
+        session.get(TaskInstance, ti0.key.primary).try_number += 1
+        session.commit()
         # Next try to run will be try 1
         assert ti0.try_number == 1
         ti0.run()
 
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         dag.clear()
         ti0.refresh_from_db()
-        assert ti0.try_number == 2
+        assert ti0.try_number == 1
         assert ti0.state == State.NONE
         assert ti0.max_tries == 1
 
         assert ti1.max_tries == 2
-        ti1.try_number = 1
-        session.merge(ti1)
+        session.get(TaskInstance, ti1.key.primary).try_number += 1
         session.commit()
-
-        # Next try will be 2
         ti1.run()
 
-        assert ti1.try_number == 3
+        assert ti1.try_number == 1
         assert ti1.max_tries == 2
 
         dag.clear()
         ti0.refresh_from_db()
         ti1.refresh_from_db()
-        # after clear dag, ti2 should show attempt 3 of 5
-        assert ti1.max_tries == 4
-        assert ti1.try_number == 3
-        # after clear dag, ti1 should show attempt 2 of 2
-        assert ti0.try_number == 2
+        # after clear dag, we have 2 remaining tries
+        assert ti1.max_tries == 3
+        assert ti1.try_number == 1
+        # after clear dag, ti0 has no remaining tries
+        assert ti0.try_number == 1
         assert ti0.max_tries == 1
 
     def test_dags_clear(self):
@@ -559,9 +601,11 @@
 
         # test clear all dags
         for i in range(num_of_dags):
+            session.get(TaskInstance, tis[i].key.primary).try_number += 1
+            session.commit()
             tis[i].run()
             assert tis[i].state == State.SUCCESS
-            assert tis[i].try_number == 2
+            assert tis[i].try_number == 1
             assert tis[i].max_tries == 0
 
         DAG.clear_dags(dags)
@@ -569,14 +613,16 @@
         for i in range(num_of_dags):
             tis[i].refresh_from_db()
             assert tis[i].state == State.NONE
-            assert tis[i].try_number == 2
+            assert tis[i].try_number == 1
             assert tis[i].max_tries == 1
 
         # test dry_run
         for i in range(num_of_dags):
+            session.get(TaskInstance, tis[i].key.primary).try_number += 1
+            session.commit()
             tis[i].run()
             assert tis[i].state == State.SUCCESS
-            assert tis[i].try_number == 3
+            assert tis[i].try_number == 2
             assert tis[i].max_tries == 1
 
         DAG.clear_dags(dags, dry_run=True)
@@ -584,7 +630,7 @@
         for i in range(num_of_dags):
             tis[i].refresh_from_db()
             assert tis[i].state == State.SUCCESS
-            assert tis[i].try_number == 3
+            assert tis[i].try_number == 2
             assert tis[i].max_tries == 1
 
         # test only_failed
@@ -599,14 +645,14 @@
             ti.refresh_from_db()
             if ti is failed_dag:
                 assert ti.state == State.NONE
-                assert ti.try_number == 3
+                assert ti.try_number == 2
                 assert ti.max_tries == 2
             else:
                 assert ti.state == State.SUCCESS
-                assert ti.try_number == 3
+                assert ti.try_number == 2
                 assert ti.max_tries == 1
 
-    def test_operator_clear(self, dag_maker):
+    def test_operator_clear(self, dag_maker, session):
         with dag_maker(
             "test_operator_clear",
             start_date=DEFAULT_DATE,
@@ -625,18 +671,27 @@
         ti1.task = op1
         ti2.task = op2
 
+        session.get(TaskInstance, ti2.key.primary).try_number += 1
+        session.commit()
         ti2.run()
         # Dependency not met
         assert ti2.try_number == 1
         assert ti2.max_tries == 1
 
         op2.clear(upstream=True)
+        # max tries will be set to retries + curr try number == 1 + 1 == 2
+        assert session.get(TaskInstance, ti2.key.primary).max_tries == 2
+
+        session.get(TaskInstance, ti1.key.primary).try_number += 1
+        session.commit()
         ti1.run()
+        assert ti1.try_number == 1
+
+        session.get(TaskInstance, ti2.key.primary).try_number += 1
+        session.commit()
         ti2.run(ignore_ti_state=True)
-        assert ti1.try_number == 2
         # max_tries is 0 because there is no task instance in db for ti1
         # so clear won't change the max_tries.
         assert ti1.max_tries == 0
         assert ti2.try_number == 2
-        # try_number (0) + retries(1)
-        assert ti2.max_tries == 1
+        assert ti2.max_tries == 2  # max tries has not changed since it was updated when op2.clear called
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index aa6be10..bb134d8 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3257,7 +3257,13 @@
     assert not dag._check_schedule_interval_matches_timetable()
 
 
-@pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ("test-run-id", None)])
+@pytest.mark.parametrize(
+    "run_id, execution_date",
+    [
+        (None, datetime_tz(2020, 1, 1)),
+        ("test-run-id", None),
+    ],
+)
 def test_set_task_instance_state(run_id, execution_date, session, dag_maker):
     """Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""
 
@@ -3321,7 +3327,9 @@
     # dagrun should be set to QUEUED
     assert dagrun.get_state() == State.QUEUED
 
-    assert {t.key for t in altered} == {("test_set_task_instance_state", "task_1", dagrun.run_id, 1, -1)}
+    assert {tuple(t.key) for t in altered} == {
+        ("test_set_task_instance_state", "task_1", dagrun.run_id, 0, -1)
+    }
 
 
 def test_set_task_instance_state_mapped(dag_maker, session):
@@ -3472,8 +3480,8 @@
     assert dagrun.get_state() == State.QUEUED
 
     assert {t.key for t in altered} == {
-        ("test_set_task_group_state", "section_1.task_1", dagrun.run_id, 1, -1),
-        ("test_set_task_group_state", "section_1.task_3", dagrun.run_id, 1, -1),
+        ("test_set_task_group_state", "section_1.task_1", dagrun.run_id, 0, -1),
+        ("test_set_task_group_state", "section_1.task_3", dagrun.run_id, 0, -1),
     }
 
 
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 3ab3ccc..fecde3b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -29,7 +29,7 @@
 from traceback import format_exception
 from typing import cast
 from unittest import mock
-from unittest.mock import MagicMock, call, mock_open, patch
+from unittest.mock import call, mock_open, patch
 from uuid import uuid4
 
 import pendulum
@@ -66,8 +66,6 @@
     TaskInstance,
     TaskInstance as TI,
     TaskInstanceNote,
-    _get_private_try_number,
-    _get_try_number,
     _run_finished_callback,
 )
 from airflow.models.taskmap import TaskMap
@@ -623,23 +621,33 @@
         ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
         ti.task = task
 
-        assert ti.try_number == 1
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
+
         # first run -- up for retry
         run_with_error(ti)
         assert ti.state == State.UP_FOR_RETRY
-        assert ti.try_number == 2
+        assert ti.try_number == 1
+
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
 
         # second run -- still up for retry because retry_delay hasn't expired
         time_machine.coordinates.shift(3)
         run_with_error(ti)
         assert ti.state == State.UP_FOR_RETRY
+        assert ti.try_number == 2
+
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
 
         # third run -- failed
         time_machine.coordinates.shift(datetime.datetime.resolution)
         run_with_error(ti)
         assert ti.state == State.FAILED
+        assert ti.try_number == 3
 
-    def test_retry_handling(self, dag_maker):
+    def test_retry_handling(self, dag_maker, session):
         """
         Test that task retries are handled properly
         """
@@ -663,36 +671,44 @@
 
         ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
         ti.task = task
-        assert ti.try_number == 1
+        assert ti.try_number == 0
+
+        session.get(TaskInstance, ti.key.primary).try_number += 1
+        session.commit()
 
         # first run -- up for retry
         run_with_error(ti)
         assert ti.state == State.UP_FOR_RETRY
-        assert ti._try_number == 1
-        assert ti.try_number == 2
+        assert ti.try_number == 1
+
+        session.get(TaskInstance, ti.key.primary).try_number += 1
+        session.commit()
 
         # second run -- fail
         run_with_error(ti)
         assert ti.state == State.FAILED
-        assert ti._try_number == 2
-        assert ti.try_number == 3
+        assert ti.try_number == 2
 
         # Clear the TI state since you can't run a task with a FAILED state without
         # clearing it first
         dag.clear()
 
+        session.get(TaskInstance, ti.key.primary).try_number += 1
+        session.commit()
+
         # third run -- up for retry
         run_with_error(ti)
         assert ti.state == State.UP_FOR_RETRY
-        assert ti._try_number == 3
-        assert ti.try_number == 4
+        assert ti.try_number == 3
+
+        session.get(TaskInstance, ti.key.primary).try_number += 1
+        session.commit()
 
         # fourth run -- fail
         run_with_error(ti)
         ti.refresh_from_db()
         assert ti.state == State.FAILED
-        assert ti._try_number == 4
-        assert ti.try_number == 5
+        assert ti.try_number == 4
         assert RenderedTaskInstanceFields.get_templated_fields(ti) == expected_rendered_ti_fields
 
     def test_next_retry_datetime(self, dag_maker):
@@ -783,8 +799,12 @@
 
         ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
         ti.task = task
-        assert ti._try_number == 0
-        assert ti.try_number == 1
+        assert ti.try_number == 0
+
+        date1 = timezone.utcnow()
+        date2 = date1 + datetime.timedelta(minutes=1)
+        date3 = date2 + datetime.timedelta(minutes=1)
+        date4 = date3 + datetime.timedelta(minutes=1)
 
         def run_ti_and_assert(
             run_date,
@@ -796,30 +816,29 @@
             expected_task_reschedule_count,
         ):
             with time_machine.travel(run_date, tick=False):
+                exc = None
                 try:
                     ti.run()
-                except AirflowException:
+                except AirflowException as e:
+                    exc = e
                     if not fail:
                         raise
+                if exc and not fail:
+                    raise RuntimeError("expected to fail")
             ti.refresh_from_db()
             assert ti.state == expected_state
-            assert ti._try_number == expected_try_number
-            assert ti.try_number == expected_try_number + 1
+            assert ti.try_number == expected_try_number
             assert ti.start_date == expected_start_date
             assert ti.end_date == expected_end_date
             assert ti.duration == expected_duration
             assert len(task_reschedules_for_ti(ti)) == expected_task_reschedule_count
 
-        date1 = timezone.utcnow()
-        date2 = date1 + datetime.timedelta(minutes=1)
-        date3 = date2 + datetime.timedelta(minutes=1)
-        date4 = date3 + datetime.timedelta(minutes=1)
-
         # Run with multiple reschedules.
         # During reschedule the try number remains the same, but each reschedule is recorded.
         # The start date is expected to remain the initial date, hence the duration increases.
-        # When finished the try number is incremented and there is no reschedule expected
-        # for this try.
+        # When there's a new try (task run following something other than a reschedule), then
+        # the scheduler will increment the try_number. We do that inline here since
+        # we're not using the scheduler.
 
         done, fail = False, False
         run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 0, 1)
@@ -831,29 +850,37 @@
         run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3)
 
         done, fail = True, False
-        run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0)
+        run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 0, 3)
 
         # Clear the task instance.
         dag.clear()
         ti.refresh_from_db()
         assert ti.state == State.NONE
-        assert ti._try_number == 1
+        assert ti.try_number == 0
 
-        # Run again after clearing with reschedules and a retry.
-        # The retry increments the try number, and for that try no reschedule is expected.
+        # We will run it again with reschedules and a retry.
+
+        # We increment the try number because that's what the scheduler would do
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
+
         # After the retry the start date is reset, hence the duration is also reset.
 
         done, fail = False, False
         run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1)
 
         done, fail = False, True
-        run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0)
+        run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1)
+
+        # scheduler would create a new try here
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
 
         done, fail = False, False
         run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1)
 
         done, fail = True, False
-        run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)
+        run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1)
 
     def test_mapped_reschedule_handling(self, dag_maker, task_reschedules_for_ti):
         """
@@ -880,8 +907,7 @@
         ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
 
         ti.task = task
-        assert ti._try_number == 0
-        assert ti.try_number == 1
+        assert ti.try_number == 0
 
         def run_ti_and_assert(
             run_date,
@@ -901,8 +927,7 @@
                         raise
             ti.refresh_from_db()
             assert ti.state == expected_state
-            assert ti._try_number == expected_try_number
-            assert ti.try_number == expected_try_number + 1
+            assert ti.try_number == expected_try_number
             assert ti.start_date == expected_start_date
             assert ti.end_date == expected_end_date
             assert ti.duration == expected_duration
@@ -929,29 +954,36 @@
         run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3)
 
         done, fail = True, False
-        run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0)
+        run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 0, 3)
 
         # Clear the task instance.
         dag.clear()
         ti.refresh_from_db()
         assert ti.state == State.NONE
-        assert ti._try_number == 1
+        assert ti.try_number == 0
 
         # Run again after clearing with reschedules and a retry.
-        # The retry increments the try number, and for that try no reschedule is expected.
+
+        # We increment the try number because that's what the scheduler would do
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
+
         # After the retry the start date is reset, hence the duration is also reset.
 
         done, fail = False, False
         run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1)
 
         done, fail = False, True
-        run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0)
+        run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1)
+
+        with create_session() as session:
+            session.get(TaskInstance, ti.key.primary).try_number += 1
 
         done, fail = False, False
         run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1)
 
         done, fail = True, False
-        run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)
+        run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1)
 
     @pytest.mark.usefixtures("test_pool")
     def test_mapped_task_reschedule_handling_clear_reschedules(self, dag_maker, task_reschedules_for_ti):
@@ -978,8 +1010,7 @@
             ).expand(poke_interval=[0])
         ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
         ti.task = task
-        assert ti._try_number == 0
-        assert ti.try_number == 1
+        assert ti.try_number == 0
 
         def run_ti_and_assert(
             run_date,
@@ -999,8 +1030,7 @@
                         raise
             ti.refresh_from_db()
             assert ti.state == expected_state
-            assert ti._try_number == expected_try_number
-            assert ti.try_number == expected_try_number + 1
+            assert ti.try_number == expected_try_number
             assert ti.start_date == expected_start_date
             assert ti.end_date == expected_end_date
             assert ti.duration == expected_duration
@@ -1015,7 +1045,7 @@
         dag.clear()
         ti.refresh_from_db()
         assert ti.state == State.NONE
-        assert ti._try_number == 0
+        assert ti.try_number == 0
         # Check that reschedules for ti have also been cleared.
         assert not task_reschedules_for_ti(ti)
 
@@ -1045,8 +1075,7 @@
             )
         ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
         ti.task = task
-        assert ti._try_number == 0
-        assert ti.try_number == 1
+        assert ti.try_number == 0
 
         def run_ti_and_assert(
             run_date,
@@ -1065,8 +1094,7 @@
                         raise
             ti.refresh_from_db()
             assert ti.state == expected_state
-            assert ti._try_number == expected_try_number
-            assert ti.try_number == expected_try_number + 1
+            assert ti.try_number == expected_try_number
             assert ti.start_date == expected_start_date
             assert ti.end_date == expected_end_date
             assert ti.duration == expected_duration
@@ -1081,7 +1109,7 @@
         dag.clear()
         ti.refresh_from_db()
         assert ti.state == State.NONE
-        assert ti._try_number == 0
+        assert ti.try_number == 0
         # Check that reschedules for ti have also been cleared.
         assert not task_reschedules_for_ti(ti)
 
@@ -1803,12 +1831,12 @@
         serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag
         ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id)
 
-        assert ti_from_deserialized_task._try_number == 0
+        assert ti_from_deserialized_task.try_number == 0
         assert ti_from_deserialized_task.check_and_change_state_before_execution()
         # State should be running, and try_number column should be incremented
         assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id
         assert ti_from_deserialized_task.state == State.RUNNING
-        assert ti_from_deserialized_task._try_number == 1
+        assert ti_from_deserialized_task.try_number == 0
 
     def test_check_and_change_state_before_execution_provided_id_overrides(self, create_task_instance):
         expected_external_executor_id = "banana"
@@ -1822,14 +1850,14 @@
         serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag
         ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id)
 
-        assert ti_from_deserialized_task._try_number == 0
+        assert ti_from_deserialized_task.try_number == 0
         assert ti_from_deserialized_task.check_and_change_state_before_execution(
             external_executor_id=expected_external_executor_id
         )
         # State should be running, and try_number column should be incremented
         assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id
         assert ti_from_deserialized_task.state == State.RUNNING
-        assert ti_from_deserialized_task._try_number == 1
+        assert ti_from_deserialized_task.try_number == 0
 
     def test_check_and_change_state_before_execution_with_exec_id(self, create_task_instance):
         expected_external_executor_id = "minions"
@@ -1840,14 +1868,14 @@
         serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag
         ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id)
 
-        assert ti_from_deserialized_task._try_number == 0
+        assert ti_from_deserialized_task.try_number == 0
         assert ti_from_deserialized_task.check_and_change_state_before_execution(
             external_executor_id=expected_external_executor_id
         )
-        # State should be running, and try_number column should be incremented
+        # State should be running, and try_number column should be unchanged
         assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id
         assert ti_from_deserialized_task.state == State.RUNNING
-        assert ti_from_deserialized_task._try_number == 1
+        assert ti_from_deserialized_task.try_number == 0
 
     def test_check_and_change_state_before_execution_dep_not_met(self, create_task_instance):
         ti = create_task_instance(dag_id="test_check_and_change_state_before_execution")
@@ -1895,12 +1923,14 @@
         Test the try_number accessor behaves in various running states
         """
         ti = create_task_instance(dag_id="test_check_and_change_state_before_execution")
-        assert 1 == ti.try_number
+        # TI starts at 0.  It's only incremented by the scheduler.
+        assert ti.try_number == 0
         ti.try_number = 2
+        assert ti.try_number == 2
         ti.state = State.RUNNING
-        assert 2 == ti.try_number
+        assert ti.try_number == 2  # unaffected by state
         ti.state = State.SUCCESS
-        assert 3 == ti.try_number
+        assert ti.try_number == 2  # unaffected by state
 
     def test_get_num_running_task_instances(self, create_task_instance):
         session = settings.Session()
@@ -2047,7 +2077,7 @@
         assert email == "to"
         assert "test_email_alert" in title
         assert "test_email_alert" in body
-        assert "Try 1" in body
+        assert "Try 0" in body
 
     @conf_vars(
         {
@@ -2130,7 +2160,7 @@
         (email, title, body), _ = mock_send_email.call_args
         assert email == "to"
         assert title == f"Airflow alert: <TaskInstance: test_dag.{task_id} test map_index=0 [failed]>"
-        assert body.startswith("Try 1")
+        assert body.startswith("Try 0")  # try number only incremented by the scheduler
         assert "test_email_alert" in body
 
         tf = (
@@ -3071,7 +3101,7 @@
         assert "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
-    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+    def test_handle_failure_updates_queued_task_updates_state(self, dag_maker):
         session = settings.Session()
         with dag_maker():
             task = EmptyOperator(task_id="mytask", retries=1)
@@ -3081,13 +3111,8 @@
         session.merge(ti)
         session.flush()
         assert ti.state == State.QUEUED
-        assert ti.try_number == 1
         ti.handle_failure("test queued ti", test_mode=True)
         assert ti.state == State.UP_FOR_RETRY
-        # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try
-        assert ti._try_number == 1
-        # Check 'ti.try_number' is bumped to 2. This is try_number for next run
-        assert ti.try_number == 2
 
     @patch.object(Stats, "incr")
     def test_handle_failure_no_task(self, Stats_incr, dag_maker):
@@ -3100,6 +3125,7 @@
             task = EmptyOperator(task_id="mytask", retries=1)
         dr = dag_maker.create_dagrun()
         ti = TI(task=task, run_id=dr.run_id)
+        ti.try_number += 1
         ti = session.merge(ti)
         ti.task = None
         ti.state = State.QUEUED
@@ -3113,10 +3139,8 @@
 
         ti.handle_failure("test queued ti", test_mode=False)
         assert ti.state == State.UP_FOR_RETRY
-        # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try
-        assert ti._try_number == 1
-        # Check 'ti.try_number' is bumped to 2. This is try_number for next run
-        assert ti.try_number == 2
+        # try_number remains at 1
+        assert ti.try_number == 1
 
         Stats_incr.assert_any_call("ti_failures", tags=expected_stats_tags)
         Stats_incr.assert_any_call("operator_failures_EmptyOperator", tags=expected_stats_tags)
@@ -3466,7 +3490,7 @@
             "end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234),
             "duration": 1.234,
             "state": State.SUCCESS,
-            "_try_number": 1,
+            "try_number": 1,
             "max_tries": 1,
             "hostname": "some_unique_hostname",
             "unixname": "some_unique_unixname",
@@ -3492,7 +3516,7 @@
             "task_display_name": "Test Refresh from DB Task",
         }
         # Make sure we aren't missing any new value in our expected_values list.
-        expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values}
+        expected_keys = {f"task_instance.{key}" for key in expected_values}
         assert {str(c) for c in TI.__table__.columns} == expected_keys, (
             "Please add all non-foreign values of TaskInstance to this list. "
             "This prevents refresh_from_db() from missing a field."
@@ -4740,20 +4764,11 @@
         BashOperator(task_id="hello", bash_command="hi")
     dag_maker.create_dagrun(state="success")
     ti = session.scalar(select(TaskInstance))
+    session.get(TaskInstance, ti.key.primary).try_number += 1
+    session.commit()
     assert ti.task_id == "hello"  # just to confirm...
     assert ti.try_number == 1  # starts out as 1
     ti.refresh_from_db()
     assert ti.try_number == 1  # stays 1
     ti.refresh_from_db()
     assert ti.try_number == 1  # stays 1
-
-
-@pytest.mark.parametrize("state", list(TaskInstanceState))
-def test_get_private_try_number(state: str):
-    mock_ti = MagicMock()
-    mock_ti.state = state
-    private_try_number = 2
-    mock_ti._try_number = private_try_number
-    mock_ti.try_number = _get_try_number(task_instance=mock_ti)
-    delattr(mock_ti, "_try_number")
-    assert _get_private_try_number(task_instance=mock_ti) == private_try_number
diff --git a/tests/plugins/priority_weight_strategy.py b/tests/plugins/priority_weight_strategy.py
index c56ae73..a205536 100644
--- a/tests/plugins/priority_weight_strategy.py
+++ b/tests/plugins/priority_weight_strategy.py
@@ -44,7 +44,7 @@
     """A priority weight strategy that decreases the priority weight with each attempt."""
 
     def get_weight(self, ti: TaskInstance):
-        return max(3 - ti._try_number + 1, 1)
+        return max(3 - ti.try_number + 1, 1)
 
 
 class TestPriorityWeightStrategyPlugin(AirflowPlugin):
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 524360d..6c6bef5 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -1179,7 +1179,7 @@
         orphaned_tasks[1].external_executor_id = "002"  # Matches a running task_arn
         orphaned_tasks[2].external_executor_id = None  # One orphaned task has no external_executor_id
         for task in orphaned_tasks:
-            task.prev_attempted_tries = 1
+            task.try_number = 1
 
         not_adopted_tasks = mock_executor.try_adopt_task_instances(orphaned_tasks)
 
diff --git a/tests/providers/celery/executors/test_celery_executor.py b/tests/providers/celery/executors/test_celery_executor.py
index 4c62a24..e091305 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -200,8 +200,6 @@
     def test_try_adopt_task_instances(self):
         start_date = timezone.utcnow() - timedelta(days=2)
 
-        try_number = 1
-
         with DAG("test_try_adopt_task_instances_none") as dag:
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
             task_2 = BaseOperator(task_id="task_2", start_date=start_date)
@@ -221,8 +219,8 @@
 
         not_adopted_tis = executor.try_adopt_task_instances(tis)
 
-        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, try_number)
-        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, try_number)
+        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0)
+        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0)
         assert executor.running == {key_1, key_2}
 
         assert executor.tasks == {key_1: AsyncResult("231"), key_2: AsyncResult("232")}
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py
index fe7c941..224824e 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -358,7 +358,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "run_id": "test",
             "airflow_kpo_in_cluster": str(in_cluster),
@@ -374,7 +374,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "run_id": "test",
             "map_index": "10",
@@ -884,7 +884,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -920,7 +920,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -991,7 +991,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -1061,7 +1061,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -1112,7 +1112,7 @@
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "1",
+            "try_number": "0",
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py b/tests/providers/cncf/kubernetes/test_template_rendering.py
index 0627eb8..f3e6110 100644
--- a/tests/providers/cncf/kubernetes/test_template_rendering.py
+++ b/tests/providers/cncf/kubernetes/test_template_rendering.py
@@ -48,7 +48,7 @@
                 "dag_id": "test_render_k8s_pod_yaml",
                 "run_id": "test_run_id",
                 "task_id": "op1",
-                "try_number": "1",
+                "try_number": "0",
             },
             "labels": {
                 "airflow-worker": "0",
@@ -57,7 +57,7 @@
                 "run_id": "test_run_id",
                 "kubernetes_executor": "True",
                 "task_id": "op1",
-                "try_number": "1",
+                "try_number": "0",
             },
             "name": mock.ANY,
             "namespace": "default",
diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py
index c37892c..fa651de 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -297,12 +297,14 @@
     listener.on_task_instance_success(None, task_instance, None)
     # This run_id will be different as we did NOT simulate increase of the try_number attribute,
     # which happens in Airflow.
-    listener.adapter.complete_task.assert_called_once_with(
+    calls = listener.adapter.complete_task.call_args_list
+    assert len(calls) == 1
+    assert calls[0][1] == dict(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="dag_id.task_id.execution_date.0",
+        run_id="dag_id.task_id.execution_date.1",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -310,12 +312,14 @@
     listener.adapter.complete_task.reset_mock()
     task_instance.try_number += 1
     listener.on_task_instance_success(None, task_instance, None)
-    listener.adapter.complete_task.assert_called_once_with(
+    calls = listener.adapter.complete_task.call_args_list
+    assert len(calls) == 1
+    assert calls[0][1] == dict(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="dag_id.task_id.execution_date.1",
+        run_id="dag_id.task_id.execution_date.2",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -334,24 +338,23 @@
 
     listener, task_instance = _create_listener_and_task_instance()
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
-
+    expected_run_id_1 = "dag_id.task_id.execution_date.1"
+    expected_run_id_2 = "dag_id.task_id.execution_date.2"
     listener.on_task_instance_running(None, task_instance, None)
-    expected_run_id = listener.adapter.start_task.call_args.kwargs["run_id"]
-    assert expected_run_id == "dag_id.task_id.execution_date.1"
+    assert listener.adapter.start_task.call_args.kwargs["run_id"] == expected_run_id_1
 
     listener.on_task_instance_failed(None, task_instance, None)
-    assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id
+    assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id_1
 
-    # This run_id will be different as we did NOT simulate increase of the try_number attribute,
-    # which happens in Airflow.
+    # This run_id will not be different as we did NOT simulate increase of the try_number attribute,
     listener.on_task_instance_success(None, task_instance, None)
-    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == "dag_id.task_id.execution_date.0"
+    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_1
 
     # Now we simulate the increase of try_number, and the run_id should reflect that change.
     # This is how airflow works, and that's why we expect the run_id to remain constant across all methods.
     task_instance.try_number += 1
     listener.on_task_instance_success(None, task_instance, None)
-    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id
+    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_2
 
 
 def test_running_task_correctly_calls_openlineage_adapter_run_id_method():
@@ -403,7 +406,7 @@
         dag_id="dag_id",
         task_id="task_id",
         execution_date="execution_date",
-        try_number=0,
+        try_number=1,
     )
 
 
@@ -428,16 +431,16 @@
 
     _, task_instance = _create_test_dag_and_task(fail_callable, "failure")
     # try_number before execution
-    assert task_instance.try_number == 1
+    assert task_instance.try_number == 0
     with suppress(CustomError):
         task_instance.run()
 
     # try_number at the moment of function being called
-    assert captured_try_numbers["running"] == 1
-    assert captured_try_numbers["failed"] == 1
+    assert captured_try_numbers["running"] == 0
+    assert captured_try_numbers["failed"] == 0
 
     # try_number after task has been executed
-    assert task_instance.try_number == 2
+    assert task_instance.try_number == 0
 
 
 @mock.patch("airflow.models.taskinstance.get_listener_manager")
@@ -457,15 +460,15 @@
 
     _, task_instance = _create_test_dag_and_task(success_callable, "success")
     # try_number before execution
-    assert task_instance.try_number == 1
+    assert task_instance.try_number == 0
     task_instance.run()
 
     # try_number at the moment of function being called
-    assert captured_try_numbers["running"] == 1
-    assert captured_try_numbers["success"] == 2
+    assert captured_try_numbers["running"] == 0
+    assert captured_try_numbers["success"] == 0
 
     # try_number after task has been executed
-    assert task_instance.try_number == 2
+    assert task_instance.try_number == 0
 
 
 @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
diff --git a/tests/providers/smtp/notifications/test_smtp.py b/tests/providers/smtp/notifications/test_smtp.py
index f1a71e2..b19cc4b 100644
--- a/tests/providers/smtp/notifications/test_smtp.py
+++ b/tests/providers/smtp/notifications/test_smtp.py
@@ -129,7 +129,7 @@
             from_email=conf.get("smtp", "smtp_mail_from"),
             to="test_reciver@test.com",
             subject="DAG dag - Task op - Run ID test in State None",
-            html_content="""<!DOCTYPE html>\n<html>\n    <head>\n        <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n        <meta name="viewport" content="width=device-width">\n    </head>\n<body>\n    <table role="presentation">\n        \n        <tr>\n            <td>Run ID:</td>\n            <td>test</td>\n        </tr>\n        <tr>\n            <td>Try:</td>\n            <td>1 of 1</td>\n        </tr>\n        <tr>\n            <td>Task State:</td>\n            <td>None</td>\n        </tr>\n        <tr>\n            <td>Host:</td>\n            <td></td>\n        </tr>\n        <tr>\n            <td>Log Link:</td>\n            <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n        </tr>\n        <tr>\n            <td>Mark Success Link:</td>\n            <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n        </tr>\n        \n    </table>\n</body>\n</html>""",
+            html_content="""<!DOCTYPE html>\n<html>\n    <head>\n        <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n        <meta name="viewport" content="width=device-width">\n    </head>\n<body>\n    <table role="presentation">\n        \n        <tr>\n            <td>Run ID:</td>\n            <td>test</td>\n        </tr>\n        <tr>\n            <td>Try:</td>\n            <td>0 of 1</td>\n        </tr>\n        <tr>\n            <td>Task State:</td>\n            <td>None</td>\n        </tr>\n        <tr>\n            <td>Host:</td>\n            <td></td>\n        </tr>\n        <tr>\n            <td>Log Link:</td>\n            <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n        </tr>\n        <tr>\n            <td>Mark Success Link:</td>\n            <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n        </tr>\n        \n    </table>\n</body>\n</html>""",
             smtp_conn_id="smtp_default",
             files=None,
             cc=None,
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index a41e683..dbf6288 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -297,7 +297,17 @@
         date1 = timezone.utcnow()
         time_machine.move_to(date1, tick=False)
 
+        sensor_ti, dummy_ti = _get_tis()
+        assert dummy_ti.state == State.NONE
+        assert sensor_ti.state == State.NONE
+
+        # ordinarily the scheduler does this
+        sensor_ti.state = State.SCHEDULED
+        sensor_ti.try_number += 1  # first TI run
+        session.commit()
+
         self._run(sensor, session=session)
+
         sensor_ti, dummy_ti = _get_tis()
         assert sensor_ti.state == State.UP_FOR_RESCHEDULE
         assert dummy_ti.state == State.NONE
@@ -326,6 +336,9 @@
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
         time_machine.move_to(date1, tick=False)
+        sensor_ti, dummy_ti = _get_tis()
+        sensor_ti.try_number += 1  # second TI run
+        session.commit()
         self._run(sensor)
         sensor_ti, dummy_ti = _get_tis()
         assert sensor_ti.state == State.UP_FOR_RESCHEDULE
@@ -359,6 +372,9 @@
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
         time_machine.move_to(date1, tick=False)
+        sensor_ti, dummy_ti = _get_tis()
+        sensor_ti.try_number += 1  # first TI run
+        session.commit()
         self._run(sensor)
         sensor_ti, dummy_ti = _get_tis()
         assert sensor_ti.state == State.UP_FOR_RESCHEDULE
@@ -382,9 +398,12 @@
         # Task is cleared
         sensor.clear()
         sensor_ti, dummy_ti = _get_tis()
-        assert sensor_ti.try_number == 2
+        assert sensor_ti.try_number == 1
         assert sensor_ti.max_tries == 2
 
+        sensor_ti, dummy_ti = _get_tis()
+        sensor_ti.try_number += 1  # second TI run
+        session.commit()
         # third poke returns False and task is rescheduled again
         date3 = date1 + timedelta(seconds=sensor.poke_interval) * 2 + sensor.retry_delay
         time_machine.coordinates.shift(sensor.poke_interval + sensor.retry_delay.total_seconds())
@@ -687,9 +706,15 @@
             tis = dr.get_task_instances(session=session)
             return next(x for x in tis if x.task_id == SENSOR_OP)
 
+        def _increment_try_number():
+            sensor_ti = _get_sensor_ti()
+            sensor_ti.try_number += 1
+            session.commit()
+
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
         time_machine.move_to(date1, tick=False)
+        _increment_try_number()  # first TI run
         self._run(sensor)
         sensor_ti = _get_sensor_ti()
         assert sensor_ti.try_number == 1
@@ -701,12 +726,13 @@
         with pytest.raises(RuntimeError):
             self._run(sensor)
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 2
+        assert sensor_ti.try_number == 1
         assert sensor_ti.max_tries == 2
         assert sensor_ti.state == State.UP_FOR_RETRY
 
         # third poke returns False and task is rescheduled again
         time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1))
+        _increment_try_number()  # second TI run
         self._run(sensor)
         sensor_ti = _get_sensor_ti()
         assert sensor_ti.try_number == 2
@@ -718,19 +744,22 @@
         with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 3
+        assert sensor_ti.try_number == 2
         assert sensor_ti.max_tries == 2
         assert sensor_ti.state == State.FAILED
 
         # Clear the failed sensor
         sensor.clear()
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 3
+        # clearing does not change the try_number
+        assert sensor_ti.try_number == 2
+        # but it does change the max_tries
         assert sensor_ti.max_tries == 4
         assert sensor_ti.state is None
 
         time_machine.coordinates.shift(20)
 
+        _increment_try_number()  # third TI run
         for _ in range(3):
             time_machine.coordinates.shift(sensor.poke_interval)
             self._run(sensor)
@@ -744,7 +773,7 @@
         with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 4
+        assert sensor_ti.try_number == 3
         assert sensor_ti.max_tries == 4
         assert sensor_ti.state == State.FAILED
 
@@ -794,13 +823,16 @@
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
         time_machine.move_to(date1, tick=False)
+        sensor_ti = _get_sensor_ti()
+        sensor_ti.try_number += 1  # first TI run
         self._run(sensor)
+
         sensor_ti = _get_sensor_ti()
         assert sensor_ti.try_number == 1
         assert sensor_ti.max_tries == 2
         assert sensor_ti.state == State.UP_FOR_RESCHEDULE
 
-        # second poke raises RuntimeError and task instance is re-scheduled again
+        # second poke raises reschedule exception and task instance is re-scheduled again
         time_machine.coordinates.shift(sensor.poke_interval)
         self._run(sensor)
         sensor_ti = _get_sensor_ti()
@@ -821,19 +853,21 @@
         with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 2
+        assert sensor_ti.try_number == 1
         assert sensor_ti.max_tries == 2
         assert sensor_ti.state == State.FAILED
 
         # Clear the failed sensor
         sensor.clear()
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 2
+        assert sensor_ti.try_number == 1
         assert sensor_ti.max_tries == 3
         assert sensor_ti.state == State.NONE
 
         time_machine.coordinates.shift(20)
 
+        sensor_ti.try_number += 1  # second TI run
+        session.commit()
         for _ in range(3):
             time_machine.coordinates.shift(sensor.poke_interval)
             self._run(sensor)
@@ -847,7 +881,7 @@
         with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
         sensor_ti = _get_sensor_ti()
-        assert sensor_ti.try_number == 3
+        assert sensor_ti.try_number == 2
         assert sensor_ti.max_tries == 3
         assert sensor_ti.state == State.FAILED
 
@@ -983,7 +1017,7 @@
             (False, AirflowException),
         ],
     )
-    def test_fail_after_resuming_deffered_sensor(self, soft_fail, expected_exception):
+    def test_fail_after_resuming_deferred_sensor(self, soft_fail, expected_exception):
         async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail)
         ti = TaskInstance(task=async_sensor)
         ti.next_method = "execute_complete"
diff --git a/tests/test_utils/mock_executor.py b/tests/test_utils/mock_executor.py
index ba555fb..eaf5d32 100644
--- a/tests/test_utils/mock_executor.py
+++ b/tests/test_utils/mock_executor.py
@@ -71,7 +71,6 @@
             sorted_queue = sorted(self.queued_tasks.items(), key=sort_by)
             for key, (_, _, _, ti) in sorted_queue[:open_slots]:
                 self.queued_tasks.pop(key)
-                ti._try_number += 1
                 state = self.mock_task_results[key]
                 ti.set_state(state, session=session)
                 self.change_state(key, state)
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 0772704..4f7eb94 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -122,7 +122,7 @@
         # We expect set_context generates a file locally.
         log_filename = file_handler.handler.baseFilename
         assert os.path.isfile(log_filename)
-        assert log_filename.endswith("1.log"), log_filename
+        assert log_filename.endswith("0.log"), log_filename
 
         ti.run(ignore_ti_state=True)
 
@@ -161,7 +161,7 @@
             python_callable=task_callable,
         )
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
-
+        ti.try_number += 1
         logger = ti.log
         ti.log.disabled = False
 
@@ -498,7 +498,7 @@
             t.task_instance = ti
         h = FileTaskHandler(base_log_folder=os.fspath(tmp_path))
         h.set_context(ti)
-        expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=1.log"
+        expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=0.log"
         if is_a_trigger:
             expected += f".trigger.{job.id}.log"
         actual = h.handler.baseFilename
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index 3d3248f..e32eb66 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -185,7 +185,7 @@
     handler = FileTaskHandler(log_path)
 
     def create_expected_log_file(try_number):
-        ti.try_number = try_number - 1
+        ti.try_number = 1
         handler.set_context(ti)
         handler.emit(logging.makeLogRecord({"msg": "Log for testing."}))
         handler.flush()
@@ -271,8 +271,9 @@
         in content_disposition
     )
     assert 200 == response.status_code
-    assert "Log for testing." in response.data.decode("utf-8")
-    assert "localhost\n" in response.data.decode("utf-8")
+    content = response.data.decode("utf-8")
+    assert "Log for testing." in content
+    assert "localhost\n" in content
 
 
 DIFFERENT_LOG_FILENAME = "{{ ti.dag_id }}/{{ ti.run_id }}/{{ ti.task_id }}/{{ try_number }}.log"
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 2b893c4..233542e 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1089,7 +1089,7 @@
             "task_id": "also_run_this",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },
@@ -1124,7 +1124,7 @@
             "task_id": "run_after_loop",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },
@@ -1159,7 +1159,7 @@
             "task_id": "run_this_last",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },
@@ -1194,7 +1194,7 @@
             "task_id": "runme_0",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },
@@ -1229,7 +1229,7 @@
             "task_id": "runme_1",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },
@@ -1264,7 +1264,7 @@
             "task_id": "runme_2",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },
@@ -1299,7 +1299,7 @@
             "task_id": "this_will_skip",
             "trigger_id": None,
             "trigger_timeout": None,
-            "try_number": 1,
+            "try_number": 0,
             "unixname": getuser(),
             "updated_at": DEFAULT_DATE.isoformat(),
         },