blob: 0451092bd72890392c9a1dbd15c75f43d44df89d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from time import sleep
from typing import Callable, NoReturn
from sqlalchemy import Column, Index, Integer, String, case
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.orm.session import Session, make_transient
from airflow.compat.functools import cached_property
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.serialization.pydantic.job import JobPydantic
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.platform import getuser
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
from airflow.utils.state import State
def _resolve_dagrun_model():
from airflow.models.dagrun import DagRun
return DagRun
class Job(Base, LoggingMixin):
"""
The ORM class representing Job stored in the database.
Jobs are processing items with state and duration that aren't task instances.
For instance a BackfillJob is a collection of task instance runs,
but should have its own state, start and end time.
"""
__tablename__ = "job"
id = Column(Integer, primary_key=True)
dag_id = Column(
String(ID_LEN),
)
state = Column(String(20))
job_type = Column(String(30))
start_date = Column(UtcDateTime())
end_date = Column(UtcDateTime())
latest_heartbeat = Column(UtcDateTime())
executor_class = Column(String(500))
hostname = Column(String(500))
unixname = Column(String(1000))
__table_args__ = (
Index("job_type_heart", job_type, latest_heartbeat),
Index("idx_job_state_heartbeat", state, latest_heartbeat),
Index("idx_job_dag_id", dag_id),
)
task_instances_enqueued = relationship(
"TaskInstance",
primaryjoin="Job.id == foreign(TaskInstance.queued_by_job_id)",
backref=backref("queued_by_job", uselist=False),
)
dag_runs = relationship(
"DagRun",
primaryjoin=lambda: Job.id == foreign(_resolve_dagrun_model().creating_job_id),
backref="creating_job",
)
"""
TaskInstances which have been enqueued by this Job.
Only makes sense for SchedulerJob and BackfillJob instances.
"""
heartrate = conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")
def __init__(self, executor=None, heartrate=None, **kwargs):
# Save init parameters as DB fields
self.hostname = get_hostname()
if executor:
self.executor = executor
self.executor_class = executor.__class__.__name__
else:
self.executor_class = conf.get("core", "EXECUTOR")
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
if heartrate is not None:
self.heartrate = heartrate
self.unixname = getuser()
self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query")
get_listener_manager().hook.on_starting(component=self)
super().__init__(**kwargs)
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
def is_alive(self, grace_multiplier=2.1):
"""
Is this job currently alive.
We define alive as in a state of RUNNING, and having sent a heartbeat
within a multiple of the heartrate (default of 2.1)
:param grace_multiplier: multiplier of heartrate to require heart beat
within
"""
if self.job_type == "SchedulerJob":
health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
else:
health_check_threshold: int = self.heartrate * grace_multiplier
return (
self.state == State.RUNNING
and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
)
@provide_session
def kill(self, session: Session = NEW_SESSION) -> NoReturn:
"""Handles on_kill callback and updates state in database."""
job = session.query(Job).filter(Job.id == self.id).first()
job.end_date = timezone.utcnow()
try:
self.on_kill()
except Exception as e:
self.log.error("on_kill() method failed: %s", str(e))
session.merge(job)
session.commit()
raise AirflowException("Job shut down externally.")
def on_kill(self):
"""Will be called when an external kill command is received."""
@provide_session
def heartbeat(
self, heartbeat_callback: Callable[[Session], None], session: Session = NEW_SESSION
) -> None:
"""
Heartbeats update the job's entry in the database with a timestamp
for the latest_heartbeat and allows for the job to be killed
externally. This allows at the system level to monitor what is
actually active.
For instance, an old heartbeat for SchedulerJob would mean something
is wrong.
This also allows for any job to be killed externally, regardless
of who is running it or on which machine it is running.
Note that if your heart rate is set to 60 seconds and you call this
method after 10 seconds of processing since the last heartbeat, it
will sleep 50 seconds to complete the 60 seconds and keep a steady
heart rate. If you go over 60 seconds before calling it, it won't
sleep at all.
:param heartbeat_callback: Callback that will be run when the heartbeat is recorded in the Job
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
try:
# This will cause it to load from the db
session.merge(self)
previous_heartbeat = self.latest_heartbeat
if self.state in State.terminating_states:
# TODO: Make sure it is AIP-44 compliant
self.kill()
# Figure out how long to sleep for
sleep_for = 0
if self.latest_heartbeat:
seconds_remaining = (
self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
)
sleep_for = max(0, seconds_remaining)
sleep(sleep_for)
# Update last heartbeat time
with create_session() as session:
# Make the session aware of this object
session.merge(self)
self.latest_heartbeat = timezone.utcnow()
session.commit()
# At this point, the DB has updated.
previous_heartbeat = self.latest_heartbeat
heartbeat_callback(session)
self.log.debug("[heartbeat]")
except OperationalError:
Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
self.log.exception("%s heartbeat got an exception", self.__class__.__name__)
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
@provide_session
def prepare_for_execution(self, session: Session = NEW_SESSION):
"""Prepares the job for execution."""
Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
self.state = State.RUNNING
self.start_date = timezone.utcnow()
session.add(self)
session.commit()
make_transient(self)
@provide_session
def complete_execution(self, session: Session = NEW_SESSION):
get_listener_manager().hook.before_stopping(component=self)
self.end_date = timezone.utcnow()
session.merge(self)
session.commit()
Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
@provide_session
def most_recent_job(self, session: Session = NEW_SESSION) -> Job | None:
"""Returns the most recent job of this type, if any, based on last heartbeat received."""
return most_recent_job(self.job_type, session=session)
@provide_session
def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | None:
"""
Return the most recent job of this type, if any, based on last heartbeat received.
Jobs in "running" state take precedence over others to make sure alive
job is returned if it is available.
:param job_type: job type to query for to get the most recent job for
:param session: Database session
"""
return (
session.query(Job)
.filter(Job.job_type == job_type)
.order_by(
# Put "running" jobs at the front.
case({State.RUNNING: 0}, value=Job.state, else_=1),
Job.latest_heartbeat.desc(),
)
.first()
)
@provide_session
def run_job(
job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
) -> int | None:
"""
Runs the job.
The Job is always an ORM object and setting the state is happening within the
same DB session and the session is kept open throughout the whole execution.
:meta private:
TODO: Maybe we should not keep the session during job execution ?.
"""
# The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it
# once final AIP-44 changes are completed.
assert not isinstance(job, JobPydantic), "Job should be ORM object not Pydantic one here (AIP-44 WIP)"
job.prepare_for_execution(session=session)
try:
return execute_job(job, execute_callable=execute_callable)
finally:
job.complete_execution(session=session)
def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | None]) -> int | None:
"""
Executes the job.
Job execution requires no session as generally executing session does not require an
active database connection. The session might be temporary acquired and used if the job
runs heartbeat during execution, but this connection is only acquired for the time of heartbeat
and in case of AIP-44 implementation it happens over the Internal API rather than directly via
the database.
After the job is completed, state of the Job is updated and it should be updated in the database,
which happens in the "complete_execution" step (which again can be executed locally in case of
database operations or over the Internal API call.
:param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
not really matter, because except of running the heartbeat and state setting,
the runner should not modify the job state.
:param execute_callable: callable to execute when running the job.
:meta private:
"""
ret = None
try:
ret = execute_callable()
# In case of max runs or max duration
job.state = State.SUCCESS
except SystemExit:
# In case of ^C or SIGTERM
job.state = State.SUCCESS
except Exception:
job.state = State.FAILED
raise
return ret
def perform_heartbeat(
job: Job | JobPydantic, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
) -> None:
"""
Performs heartbeat for the Job passed to it,optionally checking if it is necessary.
:param job: job to perform heartbeat for
:param heartbeat_callback: callback to run by the heartbeat
:param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for
triggerer for example)
"""
# The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it
# once final AIP-44 changes are completed.
assert not isinstance(job, JobPydantic), "Job should be ORM object not Pydantic one here (AIP-44 WIP)"
seconds_remaining: float = 0.0
if job.latest_heartbeat and job.heartrate:
seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds()
if seconds_remaining > 0 and only_if_necessary:
return
with create_session() as session:
job.heartbeat(heartbeat_callback=heartbeat_callback, session=session)