blob: e7fff3114fc01e31a31f41cb41e7222e63acd96e [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import getpass
import logging
import multiprocessing
import os
import psutil
import signal
import six
import socket
import sys
import threading
import time
from collections import defaultdict
from datetime import datetime
from past.builtins import basestring
from sqlalchemy import (
Column, Integer, String, DateTime, func, Index, or_, and_, not_)
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
from time import sleep
from airflow import configuration as conf
from airflow import executors, models, settings
from airflow.exceptions import AirflowException
from airflow.logging_config import configure_logging
from airflow.models import DAG, DagRun
from airflow.settings import Stats
from airflow.task_runner import get_task_runner
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils import asciiart
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
DagFileProcessorManager,
SimpleDag,
SimpleDagBag,
list_py_file_paths)
from airflow.utils.db import provide_session, pessimistic_connection_handling
from airflow.utils.email import send_email
from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
from airflow.utils.state import State
Base = models.Base
ID_LEN = models.ID_LEN
class BaseJob(Base, LoggingMixin):
"""
Abstract class to be derived for jobs. Jobs are processing items with state
and duration that aren't task instances. For instance a BackfillJob is
a collection of task instance runs, but should have it's own state, start
and end time.
"""
__tablename__ = "job"
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN),)
state = Column(String(20))
job_type = Column(String(30))
start_date = Column(DateTime())
end_date = Column(DateTime())
latest_heartbeat = Column(DateTime())
executor_class = Column(String(500))
hostname = Column(String(500))
unixname = Column(String(1000))
__mapper_args__ = {
'polymorphic_on': job_type,
'polymorphic_identity': 'BaseJob'
}
__table_args__ = (
Index('job_type_heart', job_type, latest_heartbeat),
)
def __init__(
self,
executor=executors.GetDefaultExecutor(),
heartrate=conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC'),
*args, **kwargs):
self.hostname = socket.getfqdn()
self.executor = executor
self.executor_class = executor.__class__.__name__
self.start_date = datetime.utcnow()
self.latest_heartbeat = datetime.utcnow()
self.heartrate = heartrate
self.unixname = getpass.getuser()
super(BaseJob, self).__init__(*args, **kwargs)
def is_alive(self):
return (
(datetime.utcnow() - self.latest_heartbeat).seconds <
(conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
)
def kill(self):
session = settings.Session()
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.end_date = datetime.utcnow()
try:
self.on_kill()
except:
self.log.error('on_kill() method failed')
session.merge(job)
session.commit()
session.close()
raise AirflowException("Job shut down externally.")
def on_kill(self):
'''
Will be called when an external kill command is received
'''
pass
def heartbeat_callback(self, session=None):
pass
def heartbeat(self):
'''
Heartbeats update the job's entry in the database with a timestamp
for the latest_heartbeat and allows for the job to be killed
externally. This allows at the system level to monitor what is
actually active.
For instance, an old heartbeat for SchedulerJob would mean something
is wrong.
This also allows for any job to be killed externally, regardless
of who is running it or on which machine it is running.
Note that if your heartbeat is set to 60 seconds and you call this
method after 10 seconds of processing since the last heartbeat, it
will sleep 50 seconds to complete the 60 seconds and keep a steady
heart rate. If you go over 60 seconds before calling it, it won't
sleep at all.
'''
session = settings.Session()
job = session.query(BaseJob).filter_by(id=self.id).one()
make_transient(job)
session.commit()
session.close()
if job.state == State.SHUTDOWN:
self.kill()
# Figure out how long to sleep for
sleep_for = 0
if job.latest_heartbeat:
sleep_for = max(
0,
self.heartrate - (datetime.utcnow() - job.latest_heartbeat).total_seconds())
# Don't keep session open while sleeping as it leaves a connection open
session.close()
sleep(sleep_for)
# Update last heartbeat time
session = settings.Session()
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.latest_heartbeat = datetime.utcnow()
session.merge(job)
session.commit()
self.heartbeat_callback(session=session)
session.close()
self.log.debug('[heart] Boom.')
def run(self):
Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
# Adding an entry in the DB
session = settings.Session()
self.state = State.RUNNING
session.add(self)
session.commit()
id_ = self.id
make_transient(self)
self.id = id_
# Run
self._execute()
# Marking the success in the DB
self.end_date = datetime.utcnow()
self.state = State.SUCCESS
session.merge(self)
session.commit()
session.close()
Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)
def _execute(self):
raise NotImplementedError("This method needs to be overridden")
@provide_session
def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
"""
This function checks if there are any tasks in the dagrun (or all)
that have a scheduled state but are not known by the
executor. If it finds those it will reset the state to None
so they will get picked up again.
The batch option is for performance reasons as the queries are made in
sequence.
:param filter_by_dag_run: the dag_run we want to process, None if all
:type filter_by_dag_run: models.DagRun
:return: the TIs reset (in expired SQLAlchemy state)
:rtype: List(TaskInstance)
"""
queued_tis = self.executor.queued_tasks
# also consider running as the state might not have changed in the db yet
running_tis = self.executor.running
resettable_states = [State.SCHEDULED, State.QUEUED]
TI = models.TaskInstance
DR = models.DagRun
if filter_by_dag_run is None:
resettable_tis = (
session
.query(TI)
.join(
DR,
and_(
TI.dag_id == DR.dag_id,
TI.execution_date == DR.execution_date))
.filter(
DR.state == State.RUNNING,
DR.external_trigger == False,
DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'),
TI.state.in_(resettable_states))).all()
else:
resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states,
session=session)
tis_to_reset = []
# Can't use an update here since it doesn't support joins
for ti in resettable_tis:
if ti.key not in queued_tis and ti.key not in running_tis:
tis_to_reset.append(ti)
filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
for ti in tis_to_reset])
if len(tis_to_reset) == 0:
return []
reset_tis = (
session
.query(TI)
.filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
.with_for_update()
.all())
for ti in reset_tis:
ti.state = State.NONE
session.merge(ti)
task_instance_str = '\n\t'.join(
["{}".format(x) for x in reset_tis])
session.commit()
self.log.info(
"Reset the following %s TaskInstances:\n\t%s",
len(reset_tis), task_instance_str
)
return reset_tis
class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
"""Helps call SchedulerJob.process_file() in a separate process."""
# Counter that increments everytime an instance of this class is created
class_creation_counter = 0
def __init__(self, file_path, pickle_dags, dag_id_white_list):
"""
:param file_path: a Python file containing Airflow DAG definitions
:type file_path: unicode
:param pickle_dags: whether to serialize the DAG objects to the DB
:type pickle_dags: bool
:param dag_id_whitelist: If specified, only look at these DAG ID's
:type dag_id_whitelist: list[unicode]
"""
self._file_path = file_path
# Queue that's used to pass results from the child process.
self._result_queue = multiprocessing.Queue()
# The process that was launched to process the given .
self._process = None
self._dag_id_white_list = dag_id_white_list
self._pickle_dags = pickle_dags
# The result of Scheduler.process_file(file_path).
self._result = None
# Whether the process is done running.
self._done = False
# When the process started.
self._start_time = None
# This ID is use to uniquely name the process / thread that's launched
# by this processor instance
self._instance_id = DagFileProcessor.class_creation_counter
DagFileProcessor.class_creation_counter += 1
@property
def file_path(self):
return self._file_path
@staticmethod
def _launch_process(result_queue,
file_path,
pickle_dags,
dag_id_white_list,
thread_name):
"""
Launch a process to process the given file.
:param result_queue: the queue to use for passing back the result
:type result_queue: multiprocessing.Queue
:param file_path: the file to process
:type file_path: unicode
:param pickle_dags: whether to pickle the DAGs found in the file and
save them to the DB
:type pickle_dags: bool
:param dag_id_white_list: if specified, only examine DAG ID's that are
in this list
:type dag_id_white_list: list[unicode]
:param thread_name: the name to use for the process that is launched
:type thread_name: unicode
:return: the process that was launched
:rtype: multiprocessing.Process
"""
def helper():
# This helper runs in the newly created process
log = logging.getLogger("airflow.processor")
stdout = StreamLogWriter(log, logging.INFO)
stderr = StreamLogWriter(log, logging.WARN)
set_context(log, file_path)
try:
# redirect stdout/stderr to log
sys.stdout = stdout
sys.stderr = stderr
# Re-configure the ORM engine as there are issues with multiple processes
settings.configure_orm()
# Change the thread name to differentiate log lines. This is
# really a separate process, but changing the name of the
# process doesn't work, so changing the thread name instead.
threading.current_thread().name = thread_name
start_time = time.time()
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
result = scheduler_job.process_file(file_path,
pickle_dags)
result_queue.put(result)
end_time = time.time()
log.info(
"Processing %s took %.3f seconds", file_path, end_time - start_time
)
except:
# Log exceptions through the logging framework.
log.exception("Got an exception! Propagating...")
raise
finally:
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
p = multiprocessing.Process(target=helper,
args=(),
name="{}-Process".format(thread_name))
p.start()
return p
def start(self):
"""
Launch the process and start processing the DAG.
"""
self._process = DagFileProcessor._launch_process(
self._result_queue,
self.file_path,
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id))
self._start_time = datetime.utcnow()
def terminate(self, sigkill=False):
"""
Terminate (and then kill) the process launched to process the file.
:param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
:type sigkill: bool
"""
if self._process is None:
raise AirflowException("Tried to call stop before starting!")
# The queue will likely get corrupted, so remove the reference
self._result_queue = None
self._process.terminate()
# Arbitrarily wait 5s for the process to die
self._process.join(5)
if sigkill and self._process.is_alive():
self.log.warning("Killing PID %s", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
@property
def pid(self):
"""
:return: the PID of the process launched to process the given file
:rtype: int
"""
if self._process is None:
raise AirflowException("Tried to get PID before starting!")
return self._process.pid
@property
def exit_code(self):
"""
After the process is finished, this can be called to get the return code
:return: the exit code of the process
:rtype: int
"""
if not self._done:
raise AirflowException("Tried to call retcode before process was finished!")
return self._process.exitcode
@property
def done(self):
"""
Check if the process launched to process this file is done.
:return: whether the process is finished running
:rtype: bool
"""
if self._process is None:
raise AirflowException("Tried to see if it's done before starting!")
if self._done:
return True
if not self._result_queue.empty():
self._result = self._result_queue.get_nowait()
self._done = True
self.log.debug("Waiting for %s", self._process)
self._process.join()
return True
# Potential error case when process dies
if not self._process.is_alive():
self._done = True
# Get the object from the queue or else join() can hang.
if not self._result_queue.empty():
self._result = self._result_queue.get_nowait()
self.log.debug("Waiting for %s", self._process)
self._process.join()
return True
return False
@property
def result(self):
"""
:return: result of running SchedulerJob.process_file()
:rtype: SimpleDag
"""
if not self.done:
raise AirflowException("Tried to get the result before it's done!")
return self._result
@property
def start_time(self):
"""
:return: when this started to process the file
:rtype: datetime
"""
if self._start_time is None:
raise AirflowException("Tried to get start time before it started!")
return self._start_time
class SchedulerJob(BaseJob):
"""
This SchedulerJob runs for a specific time interval and schedules the jobs
that are ready to run. It figures out the latest runs for each
task and sees if the dependencies for the next schedules are met.
If so, it creates appropriate TaskInstances and sends run commands to the
executor. It does this for each task in each DAG and repeats.
"""
__mapper_args__ = {
'polymorphic_identity': 'SchedulerJob'
}
def __init__(
self,
dag_id=None,
dag_ids=None,
subdir=settings.DAGS_FOLDER,
num_runs=-1,
file_process_interval=conf.getint('scheduler',
'min_file_process_interval'),
processor_poll_interval=1.0,
run_duration=None,
do_pickle=False,
log=None,
*args, **kwargs):
"""
:param dag_id: if specified, only schedule tasks with this DAG ID
:type dag_id: unicode
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[unicode]
:param subdir: directory containing Python files with Airflow DAG
definitions, or a specific path to a file
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited within the run_duration.
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
file, whether to serialize the DAG object to the DB
:type do_pickle: bool
"""
# for BaseJob compatibility
self.dag_id = dag_id
self.dag_ids = [dag_id] if dag_id else []
if dag_ids:
self.dag_ids.extend(dag_ids)
self.subdir = subdir
self.num_runs = num_runs
self.run_duration = run_duration
self._processor_poll_interval = processor_poll_interval
self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)
self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
self.max_threads = conf.getint('scheduler', 'max_threads')
if log:
self._log = log
self.using_sqlite = False
if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
if self.max_threads > 1:
self.log.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
self.max_threads = 1
self.using_sqlite = True
# How often to scan the DAGs directory for new files. Default to 5 minutes.
self.dag_dir_list_interval = conf.getint('scheduler',
'dag_dir_list_interval')
# How often to print out DAG file processing stats to the log. Default to
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler',
'print_stats_interval')
# Parse and schedule each file no faster than this interval. Default
# to 3 minutes.
self.file_process_interval = file_process_interval
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
'run_duration')
@provide_session
def manage_slas(self, dag, session=None):
"""
Finding all tasks that have SLAs defined, and sending alert emails
where needed. New SLA misses are also recorded in the database.
Where assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
if not any([ti.sla for ti in dag.tasks]):
self.log.info(
"Skipping SLA check for %s because no tasks in DAG have SLAs",
dag
)
return
TI = models.TaskInstance
sq = (
session
.query(
TI.task_id,
func.max(TI.execution_date).label('max_ti'))
.with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
.filter(TI.dag_id == dag.dag_id)
.filter(TI.state == State.SUCCESS)
.filter(TI.task_id.in_(dag.task_ids))
.group_by(TI.task_id).subquery('sq')
)
max_tis = session.query(TI).filter(
TI.dag_id == dag.dag_id,
TI.task_id == sq.c.task_id,
TI.execution_date == sq.c.max_ti,
).all()
ts = datetime.utcnow()
SlaMiss = models.SlaMiss
for ti in max_tis:
task = dag.get_task(ti.task_id)
dttm = ti.execution_date
if task.sla:
dttm = dag.following_schedule(dttm)
while dttm < datetime.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < datetime.utcnow():
session.merge(models.SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=dttm,
timestamp=ts))
dttm = dag.following_schedule(dttm)
session.commit()
slas = (
session
.query(SlaMiss)
.filter(SlaMiss.notification_sent == False)
.filter(SlaMiss.dag_id == dag.dag_id)
.all()
)
if slas:
sla_dates = [sla.execution_date for sla in slas]
qry = (
session
.query(TI)
.filter(TI.state != State.SUCCESS)
.filter(TI.execution_date.in_(sla_dates))
.filter(TI.dag_id == dag.dag_id)
.all()
)
blocking_tis = []
for ti in qry:
if ti.task_id in dag.task_ids:
ti.task = dag.get_task(ti.task_id)
blocking_tis.append(ti)
else:
session.delete(ti)
session.commit()
task_list = "\n".join([
sla.task_id + ' on ' + sla.execution_date.isoformat()
for sla in slas])
blocking_task_list = "\n".join([
ti.task_id + ' on ' + ti.execution_date.isoformat()
for ti in blocking_tis])
# Track whether email or any alert notification sent
# We consider email or the alert callback as notifications
email_sent = False
notification_sent = False
if dag.sla_miss_callback:
# Execute the alert callback
self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
email_content = """\
Here's a list of tasks that missed their SLAs:
<pre><code>{task_list}\n<code></pre>
Blocking tasks:
<pre><code>{blocking_task_list}\n{bug}<code></pre>
""".format(bug=asciiart.bug, **locals())
emails = []
for t in dag.tasks:
if t.email:
if isinstance(t.email, basestring):
l = [t.email]
elif isinstance(t.email, (list, tuple)):
l = t.email
for email in l:
if email not in emails:
emails.append(email)
if emails and len(slas):
send_email(
emails,
"[airflow] SLA miss on DAG=" + dag.dag_id,
email_content)
email_sent = True
notification_sent = True
# If we sent any notification, update the sla_miss table
if notification_sent:
for sla in slas:
if email_sent:
sla.email_sent = True
sla.notification_sent = True
session.merge(sla)
session.commit()
session.close()
@staticmethod
@provide_session
def clear_nonexistent_import_errors(session, known_file_paths):
"""
Clears import errors for files that no longer exist.
:param session: session for ORM operations
:type session: sqlalchemy.orm.session.Session
:param known_file_paths: The list of existing files that are parsed for DAGs
:type known_file_paths: list[unicode]
"""
query = session.query(models.ImportError)
if known_file_paths:
query = query.filter(
~models.ImportError.filename.in_(known_file_paths)
)
query.delete(synchronize_session='fetch')
session.commit()
@staticmethod
def update_import_errors(session, dagbag):
"""
For the DAGs in the given DagBag, record any associated import errors and clears
errors for files that no longer have them. These are usually displayed through the
Airflow UI so that users know that there are issues parsing DAGs.
:param session: session for ORM operations
:type session: sqlalchemy.orm.session.Session
:param dagbag: DagBag containing DAGs with import errors
:type dagbag: models.Dagbag
"""
# Clear the errors of the processed files
for dagbag_file in dagbag.file_last_changed:
session.query(models.ImportError).filter(
models.ImportError.filename == dagbag_file
).delete()
# Add the errors of the processed files
for filename, stacktrace in six.iteritems(dagbag.import_errors):
session.add(models.ImportError(
filename=filename,
stacktrace=stacktrace))
session.commit()
@provide_session
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
Returns DagRun if one is scheduled. Otherwise returns None.
"""
if dag.schedule_interval:
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
# return if already reached maximum active runs and no timeout setting
if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return
timedout_runs = 0
for dr in active_runs:
if (
dr.start_date and dag.dagrun_timeout and
dr.start_date < datetime.utcnow() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = datetime.utcnow()
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
return
# this query should be replaced by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX + '%')
))
)
last_scheduled_run = qry.scalar()
# don't schedule @once again
if dag.schedule_interval == '@once' and last_scheduled_run:
return None
# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not dag.catchup:
# The logic is that we move start_date up until
# one period before, so that datetime.utcnow() is AFTER
# the period end, and the job can be created...
now = datetime.utcnow()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
new_start = last_start
else:
new_start = dag.previous_schedule(last_start)
if dag.start_date:
if new_start >= dag.start_date:
dag.start_date = new_start
else:
dag.start_date = new_start
next_run_date = None
if not last_scheduled_run:
# First run
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
next_run_date = dag.normalize_schedule(min(task_start_dates))
self.log.debug(
"Next run date based on tasks %s",
next_run_date
)
else:
next_run_date = dag.following_schedule(last_scheduled_run)
# make sure backfills are also considered
last_run = dag.get_last_dagrun(session=session)
if last_run and next_run_date:
while next_run_date <= last_run.execution_date:
next_run_date = dag.following_schedule(next_run_date)
# don't ever schedule prior to the dag's start_date
if dag.start_date:
next_run_date = (dag.start_date if not next_run_date
else max(next_run_date, dag.start_date))
if next_run_date == dag.start_date:
next_run_date = dag.normalize_schedule(dag.start_date)
self.log.debug(
"Dag start date: %s. Next run date: %s",
dag.start_date, next_run_date
)
# don't ever schedule in the future
if next_run_date > datetime.utcnow():
return
# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
period_end = next_run_date
elif next_run_date:
period_end = dag.following_schedule(next_run_date)
# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
return
# Don't schedule a dag beyond its end_date (as specified by the task params)
# Get the min task end date, which may come from the dag.default_args
min_task_end_date = []
task_end_dates = [t.end_date for t in dag.tasks if t.end_date]
if task_end_dates:
min_task_end_date = min(task_end_dates)
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return
if next_run_date and period_end and period_end <= datetime.utcnow():
next_run = dag.create_dagrun(
run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.utcnow(),
state=State.RUNNING,
external_trigger=False
)
return next_run
def _process_task_instances(self, dag, queue):
"""
This method schedules the tasks for a single DAG by looking at the
active DAG runs and adding task instances that should run to the
queue.
"""
session = settings.Session()
# update the state of the previously active dag runs
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
active_dag_runs = []
for run in dag_runs:
self.log.info("Examining DAG run %s", run)
# don't consider runs that are executed in the future
if run.execution_date > datetime.utcnow():
self.log.error(
"Execution date is in future: %s",
run.execution_date
)
continue
if len(active_dag_runs) >= dag.max_active_runs:
self.log.info("Active dag runs > max_active_run.")
continue
# skip backfill dagruns for now as long as they are not really scheduled
if run.is_backfill:
continue
# todo: run.dag is transient but needs to be set
run.dag = dag
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity(session=session)
run.update_state(session=session)
if run.state == State.RUNNING:
make_transient(run)
active_dag_runs.append(run)
for run in active_dag_runs:
self.log.debug("Examining active DAG run: %s", run)
# this needs a fresh session sometimes tis get detached
tis = run.get_task_instances(state=(State.NONE,
State.UP_FOR_RETRY))
# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
task = dag.get_task(ti.task_id)
# fixme: ti.task is transient but needs to be set
ti.task = task
# future: remove adhoc
if task.adhoc:
continue
if ti.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True),
session=session):
self.log.debug('Queuing task: %s', ti)
queue.append(ti.key)
session.close()
@provide_session
def _change_state_for_tis_without_dagrun(self,
simple_dag_bag,
old_states,
new_state,
session=None):
"""
For all DAG IDs in the SimpleDagBag, look for task instances in the
old_states and set them to new_state if the corresponding DagRun
exists but is not in the running state. This normally should not
happen, but it can if the state of DagRuns are changed manually.
:param old_states: examine TaskInstances in this state
:type old_state: list[State]
:param new_state: set TaskInstances to this state
:type new_state: State
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag and with states in the old_state will be examined
:type simple_dag_bag: SimpleDagBag
"""
tis_changed = 0
if self.using_sqlite:
tis_to_change = (
session
.query(models.TaskInstance)
.filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
.filter(models.TaskInstance.state.in_(old_states))
.filter(and_(
models.DagRun.dag_id == models.TaskInstance.dag_id,
models.DagRun.execution_date == models.TaskInstance.execution_date,
models.DagRun.state != State.RUNNING))
.with_for_update()
.all()
)
for ti in tis_to_change:
ti.set_state(new_state, session=session)
tis_changed += 1
else:
tis_changed = (
session
.query(models.TaskInstance)
.filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
.filter(models.TaskInstance.state.in_(old_states))
.filter(and_(
models.DagRun.dag_id == models.TaskInstance.dag_id,
models.DagRun.execution_date == models.TaskInstance.execution_date,
models.DagRun.state != State.RUNNING))
.update({models.TaskInstance.state: new_state},
synchronize_session=False)
)
session.commit()
if tis_changed > 0:
self.log.warning(
"Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
tis_changed, new_state
)
@provide_session
def __get_task_concurrency_map(self, states, session=None):
"""
Returns a map from tasks to number in the states list given.
:param states: List of states to query for
:type states: List[State]
:return: A map from (dag_id, task_id) to count of tasks in states
:rtype: Dict[[String, String], Int]
"""
TI = models.TaskInstance
ti_concurrency_query = (
session
.query(TI.task_id, TI.dag_id, func.count('*'))
.filter(TI.state.in_(states))
.group_by(TI.task_id, TI.dag_id)
).all()
task_map = defaultdict(int)
for result in ti_concurrency_query:
task_id, dag_id, count = result
task_map[(dag_id, task_id)] = count
return task_map
@provide_session
def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
"""
Finds TIs that are ready for execution with respect to pool limits,
dag concurrency, executor state, and priority.
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag will be fetched from the DB and executed
:type simple_dag_bag: SimpleDagBag
:param executor: the executor that runs task instances
:type executor: BaseExecutor
:param states: Execute TaskInstances in these states
:type states: Tuple[State]
:return: List[TaskInstance]
"""
# TODO(saguziel): Change this to include QUEUED, for concurrency
# purposes we may want to count queued tasks
states_to_count_as_running = [State.RUNNING]
executable_tis = []
# Get all the queued task instances from associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
TI = models.TaskInstance
DR = models.DagRun
DM = models.DagModel
ti_query = (
session
.query(TI)
.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
.outerjoin(DR,
and_(DR.dag_id == TI.dag_id,
DR.execution_date == TI.execution_date))
.filter(or_(DR.run_id == None,
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'))))
.outerjoin(DM, DM.dag_id==TI.dag_id)
.filter(or_(DM.dag_id == None,
not_(DM.is_paused)))
)
if None in states:
ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(states)))
else:
ti_query = ti_query.filter(TI.state.in_(states))
task_instances_to_examine = ti_query.all()
if len(task_instances_to_examine) == 0:
self.log.info("No tasks to consider for execution.")
return executable_tis
# Put one task instance on each line
task_instance_str = "\n\t".join(
["{}".format(x) for x in task_instances_to_examine])
self.log.info("Tasks up for execution:\n\t%s", task_instance_str)
# Get the pool settings
pools = {p.pool: p for p in session.query(models.Pool).all()}
pool_to_task_instances = defaultdict(list)
for task_instance in task_instances_to_examine:
pool_to_task_instances[task_instance.pool].append(task_instance)
task_concurrency_map = self.__get_task_concurrency_map(states=states_to_count_as_running, session=session)
# Go through each pool, and queue up a task for execution if there are
# any open slots in the pool.
for pool, task_instances in pool_to_task_instances.items():
if not pool:
# Arbitrary:
# If queued outside of a pool, trigger no more than
# non_pooled_task_slot_count per run
open_slots = conf.getint('core', 'non_pooled_task_slot_count')
else:
open_slots = pools[pool].open_slots(session=session)
num_queued = len(task_instances)
self.log.info(
"Figuring out tasks to run in Pool(name={pool}) with {open_slots} "
"open slots and {num_queued} task instances in queue".format(
**locals()
)
)
priority_sorted_task_instances = sorted(
task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date))
# DAG IDs with running tasks that equal the concurrency limit of the dag
dag_id_to_possibly_running_task_count = {}
for task_instance in priority_sorted_task_instances:
if open_slots <= 0:
self.log.info(
"Not scheduling since there are %s open slots in pool %s",
open_slots, pool
)
# Can't schedule any more since there are no more open slots.
break
# Check to make sure that the task concurrency of the DAG hasn't been
# reached.
dag_id = task_instance.dag_id
simple_dag = simple_dag_bag.get_dag(dag_id)
if dag_id not in dag_id_to_possibly_running_task_count:
# TODO(saguziel): also check against QUEUED state, see AIRFLOW-1104
dag_id_to_possibly_running_task_count[dag_id] = \
DAG.get_num_task_instances(
dag_id,
simple_dag_bag.get_dag(dag_id).task_ids,
states=states_to_count_as_running,
session=session)
current_task_concurrency = dag_id_to_possibly_running_task_count[dag_id]
task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency
self.log.info(
"DAG %s has %s/%s running and queued tasks",
dag_id, current_task_concurrency, task_concurrency_limit
)
if current_task_concurrency >= task_concurrency_limit:
self.log.info(
"Not executing %s since the number of tasks running or queued from DAG %s"
" is >= to the DAG's task concurrency limit of %s",
task_instance, dag_id, task_concurrency_limit
)
continue
task_concurrency = simple_dag.get_task_special_arg(task_instance.task_id, 'task_concurrency')
if task_concurrency is not None:
num_running = task_concurrency_map[((task_instance.dag_id, task_instance.task_id))]
if num_running >= task_concurrency:
self.logger.info("Not executing %s since the task concurrency for this task"
" has been reached.", task_instance)
continue
else:
task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
if self.executor.has_task(task_instance):
self.log.debug(
"Not handling task %s as the executor reports it is running",
task_instance.key
)
continue
executable_tis.append(task_instance)
open_slots -= 1
dag_id_to_possibly_running_task_count[dag_id] += 1
task_instance_str = "\n\t".join(
["{}".format(x) for x in executable_tis])
self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
# so these dont expire on commit
for ti in executable_tis:
copy_dag_id = ti.dag_id
copy_execution_date = ti.execution_date
copy_task_id = ti.task_id
make_transient(ti)
ti.dag_id = copy_dag_id
ti.execution_date = copy_execution_date
ti.task_id = copy_task_id
return executable_tis
@provide_session
def _change_state_for_executable_task_instances(self, task_instances,
acceptable_states, session=None):
"""
Changes the state of task instances in the list with one of the given states
to QUEUED atomically, and returns the TIs changed.
:param task_instances: TaskInstances to change the state of
:type task_instances: List[TaskInstance]
:param acceptable_states: Filters the TaskInstances updated to be in these states
:type acceptable_states: Iterable[State]
:return: List[TaskInstance]
"""
if len(task_instances) == 0:
session.commit()
return []
TI = models.TaskInstance
filter_for_ti_state_change = (
[and_(
TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
for ti in task_instances])
ti_query = (
session
.query(TI)
.filter(or_(*filter_for_ti_state_change)))
if None in acceptable_states:
ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(acceptable_states)))
else:
ti_query = ti_query.filter(TI.state.in_(acceptable_states))
tis_to_set_to_queued = (
ti_query
.with_for_update()
.all())
if len(tis_to_set_to_queued) == 0:
self.log.info("No tasks were able to have their state changed to queued.")
session.commit()
return []
# set TIs to queued state
for task_instance in tis_to_set_to_queued:
task_instance.state = State.QUEUED
task_instance.queued_dttm = (datetime.utcnow()
if not task_instance.queued_dttm
else task_instance.queued_dttm)
session.merge(task_instance)
# save which TIs we set before session expires them
filter_for_ti_enqueue = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
for ti in tis_to_set_to_queued])
session.commit()
# requery in batch since above was expired by commit
tis_to_be_queued = (
session
.query(TI)
.filter(or_(*filter_for_ti_enqueue))
.all())
task_instance_str = "\n\t".join(
["{}".format(x) for x in tis_to_be_queued])
self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
return tis_to_be_queued
def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances):
"""
Takes task_instances, which should have been set to queued, and enqueues them
with the executor.
:param task_instances: TaskInstances to enqueue
:type task_instances: List[TaskInstance]
:param simple_dag_bag: Should contains all of the task_instances' dags
:type simple_dag_bag: SimpleDagBag
"""
TI = models.TaskInstance
# actually enqueue them
for task_instance in task_instances:
simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)
command = " ".join(TI.generate_command(
task_instance.dag_id,
task_instance.task_id,
task_instance.execution_date,
local=True,
mark_success=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=task_instance.pool,
file_path=simple_dag.full_filepath,
pickle_id=simple_dag.pickle_id))
priority = task_instance.priority_weight
queue = task_instance.queue
self.log.info(
"Sending %s to executor with priority %s and queue %s",
task_instance.key, priority, queue
)
# save attributes so sqlalchemy doesnt expire them
copy_dag_id = task_instance.dag_id
copy_task_id = task_instance.task_id
copy_execution_date = task_instance.execution_date
make_transient(task_instance)
task_instance.dag_id = copy_dag_id
task_instance.task_id = copy_task_id
task_instance.execution_date = copy_execution_date
self.executor.queue_command(
task_instance,
command,
priority=priority,
queue=queue)
@provide_session
def _execute_task_instances(self,
simple_dag_bag,
states,
session=None):
"""
Attempts to execute TaskInstances that should be executed by the scheduler.
There are three steps:
1. Pick TIs by priority with the constraint that they are in the expected states
and that we do exceed max_active_runs or pool limits.
2. Change the state for the TIs above atomically.
3. Enqueue the TIs in the executor.
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag will be fetched from the DB and executed
:type simple_dag_bag: SimpleDagBag
:param states: Execute TaskInstances in these states
:type states: Tuple[State]
:return: None
"""
executable_tis = self._find_executable_task_instances(simple_dag_bag, states,
session=session)
if self.max_tis_per_query == 0:
tis_with_state_changed = self._change_state_for_executable_task_instances(
executable_tis,
states,
session=session)
self._enqueue_task_instances_with_queued_state(
simple_dag_bag,
tis_with_state_changed)
session.commit()
return len(tis_with_state_changed)
else:
# makes chunks of max_tis_per_query size
chunks = ([executable_tis[i:i + self.max_tis_per_query]
for i in range(0, len(executable_tis), self.max_tis_per_query)])
total_tis_queued = 0
for chunk in chunks:
tis_with_state_changed = self._change_state_for_executable_task_instances(
chunk,
states,
session=session)
self._enqueue_task_instances_with_queued_state(
simple_dag_bag,
tis_with_state_changed)
session.commit()
total_tis_queued += len(tis_with_state_changed)
return total_tis_queued
def _process_dags(self, dagbag, dags, tis_out):
"""
Iterates over the dags and processes them. Processing includes:
1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs.
:param dagbag: a collection of DAGs to process
:type dagbag: models.DagBag
:param dags: the DAGs from the DagBag to process
:type dags: DAG
:param tis_out: A queue to add generated TaskInstance objects
:type tis_out: multiprocessing.Queue[TaskInstance]
:return: None
"""
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
if dag.is_paused:
self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
continue
if not dag:
self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
continue
self.log.info("Processing %s", dag.dag_id)
dag_run = self.create_dag_run(dag)
if dag_run:
self.log.info("Created %s", dag_run)
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)
models.DagStat.update([d.dag_id for d in dags])
@provide_session
def _process_executor_events(self, simple_dag_bag, session=None):
"""
Respond to executor events.
"""
# TODO: this shares quite a lot of code with _manage_executor_state
TI = models.TaskInstance
for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)
.items()):
dag_id, task_id, execution_date = key
self.log.info(
"Executor reports %s.%s execution_date=%s as %s",
dag_id, task_id, execution_date, state
)
if state == State.FAILED or state == State.SUCCESS:
qry = session.query(TI).filter(TI.dag_id == dag_id,
TI.task_id == task_id,
TI.execution_date == execution_date)
ti = qry.first()
if not ti:
self.log.warning("TaskInstance %s went missing from the database", ti)
continue
# TODO: should we fail RUNNING as well, as we do in Backfills?
if ti.state == State.QUEUED:
msg = ("Executor reports task instance %s finished (%s) "
"although the task says its %s. Was the task "
"killed externally?".format(ti, state, ti.state))
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle failure for %s"
". Setting task to FAILED without callbacks or "
"retries. Do you have enough resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()
def _log_file_processing_stats(self,
known_file_paths,
processor_manager):
"""
Print out stats about how files are getting processed.
:param known_file_paths: a list of file paths that may contain Airflow
DAG definitions
:type known_file_paths: list[unicode]
:param processor_manager: manager for the file processors
:type stats: DagFileProcessorManager
:return: None
"""
# File Path: Path to the file containing the DAG definition
# PID: PID associated with the process that's processing the file. May
# be empty.
# Runtime: If the process is currently running, how long it's been
# running for in seconds.
# Last Runtime: If the process ran before, how long did it take to
# finish in seconds
# Last Run: When the file finished processing in the previous run.
headers = ["File Path",
"PID",
"Runtime",
"Last Runtime",
"Last Run"]
rows = []
for file_path in known_file_paths:
last_runtime = processor_manager.get_last_runtime(file_path)
processor_pid = processor_manager.get_pid(file_path)
processor_start_time = processor_manager.get_start_time(file_path)
runtime = ((datetime.utcnow() - processor_start_time).total_seconds()
if processor_start_time else None)
last_run = processor_manager.get_last_finish_time(file_path)
rows.append((file_path,
processor_pid,
runtime,
last_runtime,
last_run))
# Sort by longest last runtime. (Can't sort None values in python3)
rows = sorted(rows, key=lambda x: x[3] or 0.0)
formatted_rows = []
for file_path, pid, runtime, last_runtime, last_run in rows:
formatted_rows.append((file_path,
pid,
"{:.2f}s".format(runtime)
if runtime else None,
"{:.2f}s".format(last_runtime)
if last_runtime else None,
last_run.strftime("%Y-%m-%dT%H:%M:%S")
if last_run else None))
log_str = ("\n" +
"=" * 80 +
"\n" +
"DAG File Processing Stats\n\n" +
tabulate(formatted_rows, headers=headers) +
"\n" +
"=" * 80)
self.log.info(log_str)
def _execute(self):
self.log.info("Starting the scheduler")
pessimistic_connection_handling()
# DAGs can be pickled for easier remote execution by some executors
pickle_dags = False
if self.do_pickle and self.executor.__class__ not in \
(executors.LocalExecutor, executors.SequentialExecutor):
pickle_dags = True
# Use multiple processes to parse and generate tasks for the
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.
self.log.info("Processing files using up to %s processes at a time", self.max_threads)
self.log.info("Running execute loop for %s seconds", self.run_duration)
self.log.info("Processing each file at most %s times", self.num_runs)
self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
def processor_factory(file_path):
return DagFileProcessor(file_path,
pickle_dags,
self.dag_ids)
processor_manager = DagFileProcessorManager(self.subdir,
known_file_paths,
self.max_threads,
self.file_process_interval,
self.num_runs,
processor_factory)
try:
self._execute_helper(processor_manager)
finally:
self.log.info("Exited execute loop")
# Kill all child processes on exit since we don't want to leave
# them as orphaned.
pids_to_kill = processor_manager.get_all_pids()
if len(pids_to_kill) > 0:
# First try SIGTERM
this_process = psutil.Process(os.getpid())
# Only check child processes to ensure that we don't have a case
# where we kill the wrong process because a child process died
# but the PID got reused.
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
for child in child_processes:
self.log.info("Terminating child PID: %s", child.pid)
child.terminate()
# TODO: Remove magic number
timeout = 5
self.log.info("Waiting up to %s seconds for processes to exit...", timeout)
try:
psutil.wait_procs(child_processes, timeout)
except psutil.TimeoutExpired:
self.log.debug("Ran out of time while waiting for processes to exit")
# Then SIGKILL
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
if len(child_processes) > 0:
for child in child_processes:
self.log.info("Killing child PID: %s", child.pid)
child.kill()
child.wait()
def _execute_helper(self, processor_manager):
"""
:param processor_manager: manager to use
:type processor_manager: DagFileProcessorManager
:return: None
"""
self.executor.start()
session = settings.Session()
self.log.info("Resetting orphaned tasks for active dag runs")
self.reset_state_for_orphaned_tasks(session=session)
session.close()
execute_start_time = datetime.utcnow()
# Last time stats were printed
last_stat_print_time = datetime(2000, 1, 1)
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = datetime.utcnow()
# Last time that the DAG dir was traversed to look for files
last_dag_dir_refresh_time = datetime.utcnow()
# Use this value initially
known_file_paths = processor_manager.file_paths
# For the execute duration, parse and schedule DAGs
while (datetime.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
loop_start_time = time.time()
# Traverse the DAG directory for Python files containing DAGs
# periodically
elapsed_time_since_refresh = (datetime.utcnow() -
last_dag_dir_refresh_time).total_seconds()
if elapsed_time_since_refresh > self.dag_dir_list_interval:
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = datetime.utcnow()
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
processor_manager.set_file_paths(known_file_paths)
self.log.debug("Removing old import errors")
self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)
# Kick of new processes and collect results from finished ones
self.log.info("Heartbeating the process manager")
simple_dags = processor_manager.heartbeat()
if self.using_sqlite:
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug("Waiting for processors to finish since we're using sqlite")
processor_manager.wait_until_finished()
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if len(simple_dags) > 0:
# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states
# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued, but the corresponding
# DAG run isn't running, set the state to NONE so we don't try to
# re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED],
State.NONE)
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))
# Call heartbeats
self.log.info("Heartbeating the executor")
self.executor.heartbeat()
# Process events from the executor
self._process_executor_events(simple_dag_bag)
# Heartbeat the scheduler periodically
time_since_last_heartbeat = (datetime.utcnow() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
self.log.info("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = datetime.utcnow()
# Occasionally print out stats about how fast the files are getting processed
if ((datetime.utcnow() - last_stat_print_time).total_seconds() >
self.print_stats_interval):
if len(known_file_paths) > 0:
self._log_file_processing_stats(known_file_paths,
processor_manager)
last_stat_print_time = datetime.utcnow()
loop_end_time = time.time()
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)
# Exit early for a test mode
if processor_manager.max_runs_reached():
self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
break
# Stop any processors
processor_manager.terminate()
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
all_files_processed = True
for file_path in known_file_paths:
if processor_manager.get_last_finish_time(file_path) is None:
all_files_processed = False
break
if all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
models.DAG.deactivate_stale_dags(execute_start_time)
self.executor.end()
settings.Session.remove()
@provide_session
def process_file(self, file_path, pickle_dags=False, session=None):
"""
Process a Python file containing Airflow DAGs.
This includes:
1. Execute the file and look for DAG objects in the namespace.
2. Pickle the DAG and save it to the DB (if necessary).
3. For each DAG, see what tasks should run and create appropriate task
instances in the DB.
4. Record any errors importing the file into ORM
5. Kill (in ORM) any task instances belonging to the DAGs that haven't
issued a heartbeat in a while.
Returns a list of SimpleDag objects that represent the DAGs found in
the file
:param file_path: the path to the Python file that should be executed
:type file_path: unicode
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:type pickle_dags: bool
:return: a list of SimpleDags made from the Dags found in the file
:rtype: list[SimpleDag]
"""
self.log.info("Processing file %s for tasks to queue", file_path)
# As DAGs are parsed from this file, they will be converted into SimpleDags
simple_dags = []
try:
dagbag = models.DagBag(file_path)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr('dag_file_refresh_error', 1, 1)
return []
if len(dagbag.dags) > 0:
self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
else:
self.log.warning("No viable dags retrieved from %s", file_path)
self.update_import_errors(session, dagbag)
return []
# Save individual DAGs in the ORM and update DagModel.last_scheduled_time
for dag in dagbag.dags.values():
dag.sync_to_db()
paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values()
if dag.is_paused]
# Pickle the DAGs (if necessary) and put them into a SimpleDag
for dag_id in dagbag.dags:
dag = dagbag.get_dag(dag_id)
pickle_id = None
if pickle_dags:
pickle_id = dag.pickle(session).id
# Only return DAGs that are not paused
if dag_id not in paused_dag_ids:
simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))
if len(self.dag_ids) > 0:
dags = [dag for dag in dagbag.dags.values()
if dag.dag_id in self.dag_ids and
dag.dag_id not in paused_dag_ids]
else:
dags = [dag for dag in dagbag.dags.values()
if not dag.parent_dag and
dag.dag_id not in paused_dag_ids]
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?)
ti_keys_to_schedule = []
self._process_dags(dagbag, dags, ti_keys_to_schedule)
for ti_key in ti_keys_to_schedule:
dag = dagbag.dags[ti_key[0]]
task = dag.get_task(ti_key[1])
ti = models.TaskInstance(task, ti_key[2])
ti.refresh_from_db(session=session, lock_for_update=True)
# We can defer checking the task dependency checks to the worker themselves
# since they can be expensive to run in the scheduler.
dep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True)
# Only schedule tasks that have their dependencies met, e.g. to avoid
# a task that recently got it's state changed to RUNNING from somewhere
# other than the scheduler from getting it's state overwritten.
# TODO(aoen): It's not great that we have to check all the task instance
# dependencies twice; once to get the task scheduled, and again to actually
# run the task. We should try to come up with a way to only check them once.
if ti.are_dependencies_met(
dep_context=dep_context,
session=session,
verbose=True):
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
# Also save this task instance to the DB.
self.log.info("Creating / updating %s in ORM", ti)
session.merge(ti)
session.commit()
# Record import errors into the ORM
try:
self.update_import_errors(session, dagbag)
except Exception:
self.log.exception("Error logging import errors!")
try:
dagbag.kill_zombies()
except Exception:
self.log.exception("Error killing zombies!")
return simple_dags
@provide_session
def heartbeat_callback(self, session=None):
Stats.gauge('scheduler_heartbeat', 1, 1)
class BackfillJob(BaseJob):
"""
A backfill job consists of a dag or subdag for a specific time range. It
triggers a set of task instance runs, in the right order and lasts for
as long as it takes for the set of task instance to be completed.
"""
ID_PREFIX = 'backfill_'
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
__mapper_args__ = {
'polymorphic_identity': 'BackfillJob'
}
class _DagRunTaskStatus(object):
"""
Internal status of the backfill job. This class is intended to be instantiated
only within a BackfillJob instance and will track the execution of tasks,
e.g. started, skipped, succeeded, failed, etc. Information about the dag runs
related to the backfill job are also being tracked in this structure,
.e.g finished runs, etc. Any other status related information related to the
execution of dag runs / tasks can be included in this structure since it makes
it easier to pass it around.
"""
# TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts
def __init__(self,
to_run=None,
started=None,
skipped=None,
succeeded=None,
failed=None,
not_ready=None,
deadlocked=None,
active_runs=None,
executed_dag_run_dates=None,
finished_runs=0,
total_runs=0,
):
"""
:param to_run: Tasks to run in the backfill
:type to_run: dict[Tuple[String, String, DateTime], TaskInstance]
:param started: Maps started task instance key to task instance object
:type started: dict[Tuple[String, String, DateTime], TaskInstance]
:param skipped: Tasks that have been skipped
:type skipped: set[Tuple[String, String, DateTime]]
:param succeeded: Tasks that have succeeded so far
:type succeeded: set[Tuple[String, String, DateTime]]
:param failed: Tasks that have failed
:type failed: set[Tuple[String, String, DateTime]]
:param not_ready: Tasks not ready for execution
:type not_ready: set[Tuple[String, String, DateTime]]
:param deadlocked: Deadlocked tasks
:type deadlocked: set[Tuple[String, String, DateTime]]
:param active_runs: Active dag runs at a certain point in time
:type active_runs: list[DagRun]
:param executed_dag_run_dates: Datetime objects for the executed dag runs
:type executed_dag_run_dates: set[Datetime]
:param finished_runs: Number of finished runs so far
:type finished_runs: int
:param total_runs: Number of total dag runs able to run
:type total_runs: int
"""
self.to_run = to_run or dict()
self.started = started or dict()
self.skipped = skipped or set()
self.succeeded = succeeded or set()
self.failed = failed or set()
self.not_ready = not_ready or set()
self.deadlocked = deadlocked or set()
self.active_runs = active_runs or list()
self.executed_dag_run_dates = executed_dag_run_dates or set()
self.finished_runs = finished_runs
self.total_runs = total_runs
def __init__(
self,
dag,
start_date=None,
end_date=None,
mark_success=False,
include_adhoc=False,
donot_pickle=False,
ignore_first_depends_on_past=False,
ignore_task_deps=False,
pool=None,
delay_on_limit_secs=1.0,
*args, **kwargs):
self.dag = dag
self.dag_id = dag.dag_id
self.bf_start_date = start_date
self.bf_end_date = end_date
self.mark_success = mark_success
self.include_adhoc = include_adhoc
self.donot_pickle = donot_pickle
self.ignore_first_depends_on_past = ignore_first_depends_on_past
self.ignore_task_deps = ignore_task_deps
self.pool = pool
self.delay_on_limit_secs = delay_on_limit_secs
super(BackfillJob, self).__init__(*args, **kwargs)
def _update_counters(self, ti_status):
"""
Updates the counters per state of the tasks that were running. Can re-add
to tasks to run in case required.
:param ti_status: the internal status of the backfill job tasks
:type ti_status: BackfillJob._DagRunTaskStatus
"""
for key, ti in list(ti_status.started.items()):
ti.refresh_from_db()
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.started.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.started.pop(key)
continue
elif ti.state == State.FAILED:
self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.started.pop(key)
continue
# special case: if the task needs to run again put it back
elif ti.state == State.UP_FOR_RETRY:
self.log.warning("Task instance %s is up for retry", ti)
ti_status.started.pop(key)
ti_status.to_run[key] = ti
# special case: The state of the task can be set to NONE by the task itself
# when it reaches concurrency limits. It could also happen when the state
# is changed externally, e.g. by clearing tasks from the ui. We need to cover
# for that as otherwise those tasks would fall outside of the scope of
# the backfill suddenly.
elif ti.state == State.NONE:
self.log.warning(
"FIXME: task instance %s state was set to none externally or "
"reaching concurrency limits. Re-adding task to queue.",
ti
)
session = settings.Session()
ti.set_state(State.SCHEDULED, session=session)
session.close()
ti_status.started.pop(key)
ti_status.to_run[key] = ti
def _manage_executor_state(self, started):
"""
Checks if the executor agrees with the state of task instances
that are running
:param started: dict of key, task to verify
"""
executor = self.executor
for key, state in list(executor.get_event_buffer().items()):
if key not in started:
self.log.warning(
"%s state %s not in started=%s",
key, state, started.values()
)
continue
ti = started[key]
ti.refresh_from_db()
self.log.debug("Executor state: %s task %s", state, ti)
if state == State.FAILED or state == State.SUCCESS:
if ti.state == State.RUNNING or ti.state == State.QUEUED:
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
self.log.error(msg)
ti.handle_failure(msg)
@provide_session
def _get_dag_run(self, run_date, session=None):
"""
Returns a dag run for the given run date, which will be matched to an existing
dag run if available or create a new dag run otherwise. If the max_active_runs
limit is reached, this function will return None.
:param run_date: the execution date for the dag run
:type run_date: datetime
:param session: the database session object
:type session: Session
:return: a DagRun in state RUNNING or None
"""
run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())
# consider max_active_runs but ignore when running subdags
respect_dag_max_active_limit = (True
if (self.dag.schedule_interval and
not self.dag.is_subdag)
else False)
current_active_dag_count = self.dag.get_num_active_runs(external_trigger=False)
# check if we are scheduling on top of a already existing dag_run
# we could find a "scheduled" run instead of a "backfill"
run = DagRun.find(dag_id=self.dag.dag_id,
execution_date=run_date,
session=session)
if run is not None and len(run) > 0:
run = run[0]
if run.state == State.RUNNING:
respect_dag_max_active_limit = False
else:
run = None
# enforce max_active_runs limit for dag, special cases already
# handled by respect_dag_max_active_limit
if (respect_dag_max_active_limit and
current_active_dag_count >= self.dag.max_active_runs):
return None
run = run or self.dag.create_dagrun(
run_id=run_id,
execution_date=run_date,
start_date=datetime.utcnow(),
state=State.RUNNING,
external_trigger=False,
session=session
)
# set required transient field
run.dag = self.dag
# explicitly mark as backfill and running
run.state = State.RUNNING
run.run_id = run_id
run.verify_integrity(session=session)
return run
@provide_session
def _task_instances_for_dag_run(self, dag_run, session=None):
"""
Returns a map of task instance key to task instance object for the tasks to
run in the given dag run.
:param dag_run: the dag run to get the tasks from
:type dag_run: models.DagRun
:param session: the database session object
:type session: Session
"""
tasks_to_run = {}
if dag_run is None:
return tasks_to_run
# check if we have orphaned tasks
self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)
# for some reason if we don't refresh the reference to run is lost
dag_run.refresh_from_db()
make_transient(dag_run)
# TODO(edgarRd): AIRFLOW-1464 change to batch query to improve perf
for ti in dag_run.get_task_instances():
# all tasks part of the backfill are scheduled to run
if ti.state == State.NONE:
ti.set_state(State.SCHEDULED, session=session)
tasks_to_run[ti.key] = ti
return tasks_to_run
def _log_progress(self, ti_status):
msg = ' | '.join([
"[backfill progress]",
"finished run {0} of {1}",
"tasks waiting: {2}",
"succeeded: {3}",
"kicked_off: {4}",
"failed: {5}",
"skipped: {6}",
"deadlocked: {7}",
"not ready: {8}"
]).format(
ti_status.finished_runs,
ti_status.total_runs,
len(ti_status.to_run),
len(ti_status.succeeded),
len(ti_status.started),
len(ti_status.failed),
len(ti_status.skipped),
len(ti_status.deadlocked),
len(ti_status.not_ready))
self.log.info(msg)
self.log.debug(
"Finished dag run loop iteration. Remaining tasks %s",
ti_status.to_run.values()
)
@provide_session
def _process_backfill_task_instances(self,
ti_status,
executor,
pickle_id,
start_date=None, session=None):
"""
Process a set of task instances from a set of dag runs. Special handling is done
to account for different task instance states that could be present when running
them in a backfill process.
:param ti_status: the internal status of the job
:type ti_status: BackfillJob._DagRunTaskStatus
:param executor: the executor to run the task instances
:type executor: BaseExecutor
:param pickle_id: the pickle_id if dag is pickled, None otherwise
:type pickle_id: int
:param start_date: the start date of the backfill job
:type start_date: datetime
:param session: the current session object
:type session: Session
:return: the list of execution_dates for the finished dag runs
:rtype: list
"""
executed_run_dates = []
while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
len(ti_status.deadlocked) == 0):
self.log.debug("*** Clearing out not_ready list ***")
ti_status.not_ready.clear()
# we need to execute the tasks bottom to top
# or leaf to root, as otherwise tasks might be
# determined deadlocked while they are actually
# waiting for their upstream to finish
for task in self.dag.topological_sort():
for key, ti in list(ti_status.to_run.items()):
if task.task_id != ti.task_id:
continue
ti.refresh_from_db()
task = self.dag.get_task(ti.task_id)
ti.task = task
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))
self.log.debug("Task instance to run %s state %s", ti, ti.state)
# guard against externally modified tasks instances or
# in case max concurrency has been reached at task runtime
if ti.state == State.NONE:
self.log.warning(
"FIXME: task instance {} state was set to None externally. This should not happen"
)
ti.set_state(State.SCHEDULED, session=session)
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
elif ti.state == State.FAILED:
self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
elif ti.state == State.UPSTREAM_FAILED:
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
backfill_context = DepContext(
deps=RUN_DEPS,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
flag_upstream_failed=True)
# Is the task runnable? -- then run it
# the dependency checker can change states of tis
if ti.are_dependencies_met(
dep_context=backfill_context,
session=session,
verbose=True):
ti.refresh_from_db(lock_for_update=True, session=session)
if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
if executor.has_task(ti):
self.log.debug(
"Task Instance %s already in executor waiting for queue to clear",
ti
)
else:
self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
session.merge(ti)
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_task_deps=self.ignore_task_deps,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool)
ti_status.started[key] = ti
ti_status.to_run.pop(key)
session.commit()
continue
if ti.state == State.UPSTREAM_FAILED:
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
# special case
if ti.state == State.UP_FOR_RETRY:
self.log.debug("Task instance %s retry period not expired yet", ti)
if key in ti_status.started:
ti_status.started.pop(key)
ti_status.to_run[key] = ti
continue
# all remaining tasks
self.log.debug('Adding %s to not_ready', ti)
ti_status.not_ready.add(key)
# execute the tasks in the queue
self.heartbeat()
executor.heartbeat()
# If the set of tasks that aren't ready ever equals the set of
# tasks to run and there are no running tasks then the backfill
# is deadlocked
if (ti_status.not_ready and
ti_status.not_ready == set(ti_status.to_run) and
len(ti_status.started) == 0):
self.log.warning(
"Deadlock discovered for ti_status.to_run=%s",
ti_status.to_run.values()
)
ti_status.deadlocked.update(ti_status.to_run.values())
ti_status.to_run.clear()
# check executor state
self._manage_executor_state(ti_status.started)
# update the task counters
self._update_counters(ti_status=ti_status)
# update dag run state
_dag_runs = ti_status.active_runs[:]
for run in _dag_runs:
run.update_state(session=session)
if run.state in State.finished():
ti_status.finished_runs += 1
ti_status.active_runs.remove(run)
executed_run_dates.append(run.execution_date)
if run.dag.is_paused:
models.DagStat.update([run.dag_id], session=session)
self._log_progress(ti_status)
# return updated status
return executed_run_dates
@provide_session
def _collect_errors(self, ti_status, session=None):
err = ''
if ti_status.failed:
err += (
"---------------------------------------------------\n"
"Some task instances failed:\n%s\n".format(ti_status.failed))
if ti_status.deadlocked:
err += (
'---------------------------------------------------\n'
'BackfillJob is deadlocked.')
deadlocked_depends_on_past = any(
t.are_dependencies_met(
dep_context=DepContext(ignore_depends_on_past=False),
session=session,
verbose=True) !=
t.are_dependencies_met(
dep_context=DepContext(ignore_depends_on_past=True),
session=session,
verbose=True)
for t in ti_status.deadlocked)
if deadlocked_depends_on_past:
err += (
'Some of the deadlocked tasks were unable to run because '
'of "depends_on_past" relationships. Try running the '
'backfill with the option '
'"ignore_first_depends_on_past=True" or passing "-I" at '
'the command line.')
err += ' These tasks have succeeded:\n{}\n'.format(ti_status.succeeded)
err += ' These tasks have started:\n{}\n'.format(ti_status.started)
err += ' These tasks have failed:\n{}\n'.format(ti_status.failed)
err += ' These tasks are skipped:\n{}\n'.format(ti_status.skipped)
err += ' These tasks are deadlocked:\n{}\n'.format(ti_status.deadlocked)
return err
@provide_session
def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
start_date, session=None):
"""
Computes the dag runs and their respective task instances for
the given run dates and executes the task instances.
Returns a list of execution dates of the dag runs that were executed.
:param run_dates: Execution dates for dag runs
:type run_dates: list
:param ti_status: internal BackfillJob status structure to tis track progress
:type ti_status: BackfillJob._DagRunTaskStatus
:param executor: the executor to use, it must be previously started
:type executor: BaseExecutor
:param pickle_id: numeric id of the pickled dag, None if not pickled
:type pickle_id: int
:param start_date: backfill start date
:type start_date: datetime
:param session: the current session object
:type session: Session
"""
for next_run_date in run_dates:
dag_run = self._get_dag_run(next_run_date, session=session)
tis_map = self._task_instances_for_dag_run(dag_run,
session=session)
if dag_run is None:
continue
ti_status.active_runs.append(dag_run)
ti_status.to_run.update(tis_map or {})
processed_dag_run_dates = self._process_backfill_task_instances(
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
start_date=start_date,
session=session)
ti_status.executed_dag_run_dates.update(processed_dag_run_dates)
def _execute(self):
"""
Initializes all components required to run a dag for a specified date range and
calls helper method to execute the tasks.
"""
session = settings.Session()
ti_status = BackfillJob._DagRunTaskStatus()
start_date = self.bf_start_date
# Get intervals between the start/end dates, which will turn into dag runs
run_dates = self.dag.get_run_dates(start_date=start_date,
end_date=self.bf_end_date)
if len(run_dates) == 0:
self.log.info("No run dates were found for the given dates and dag interval.")
return
# picklin'
pickle_id = None
if not self.donot_pickle and self.executor.__class__ not in (
executors.LocalExecutor, executors.SequentialExecutor):
pickle = models.DagPickle(self.dag)
session.add(pickle)
session.commit()
pickle_id = pickle.id
executor = self.executor
executor.start()
ti_status.total_runs = len(run_dates) # total dag runs in backfill
try:
remaining_dates = ti_status.total_runs
while remaining_dates > 0:
dates_to_process = [run_date for run_date in run_dates
if run_date not in ti_status.executed_dag_run_dates]
self._execute_for_run_dates(run_dates=dates_to_process,
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
start_date=start_date,
session=session)
remaining_dates = (
ti_status.total_runs - len(ti_status.executed_dag_run_dates)
)
err = self._collect_errors(ti_status=ti_status, session=session)
if err:
raise AirflowException(err)
if remaining_dates > 0:
self.log.info(
"max_active_runs limit for dag %s has been reached "
" - waiting for other dag runs to finish",
self.dag_id
)
time.sleep(self.delay_on_limit_secs)
finally:
executor.end()
session.commit()
session.close()
self.log.info("Backfill done. Exiting.")
class LocalTaskJob(BaseJob):
__mapper_args__ = {
'polymorphic_identity': 'LocalTaskJob'
}
def __init__(
self,
task_instance,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
mark_success=False,
pickle_id=None,
pool=None,
*args, **kwargs):
self.task_instance = task_instance
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
self.ignore_task_deps = ignore_task_deps
self.ignore_ti_state = ignore_ti_state
self.pool = pool
self.pickle_id = pickle_id
self.mark_success = mark_success
# terminating state is used so that a job don't try to
# terminate multiple times
self.terminating = False
super(LocalTaskJob, self).__init__(*args, **kwargs)
def _execute(self):
self.task_runner = get_task_runner(self)
def signal_handler(signum, frame):
"""Setting kill signal handler"""
self.log.error("Killing subprocess")
self.on_kill()
raise AirflowException("LocalTaskJob received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
if not self.task_instance._check_and_change_state_before_execution(
mark_success=self.mark_success,
ignore_all_deps=self.ignore_all_deps,
ignore_depends_on_past=self.ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
ignore_ti_state=self.ignore_ti_state,
job_id=self.id,
pool=self.pool):
self.log.info("Task is not able to be run")
return
try:
self.task_runner.start()
last_heartbeat_time = time.time()
heartbeat_time_limit = conf.getint('scheduler',
'scheduler_zombie_task_threshold')
while True:
# Monitor the task to see if it's done
return_code = self.task_runner.return_code()
if return_code is not None:
self.log.info("Task exited with return code %s", return_code)
return
# Periodically heartbeat so that the scheduler doesn't think this
# is a zombie
try:
self.heartbeat()
last_heartbeat_time = time.time()
except OperationalError:
Stats.incr('local_task_job_heartbeat_failure', 1, 1)
self.log.exception(
"Exception while trying to heartbeat! Sleeping for %s seconds",
self.heartrate
)
time.sleep(self.heartrate)
# If it's been too long since we've heartbeat, then it's possible that
# the scheduler rescheduled this task, so kill launched processes.
time_since_last_heartbeat = time.time() - last_heartbeat_time
if time_since_last_heartbeat > heartbeat_time_limit:
Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
self.log.error("Heartbeat time limited exceeded!")
raise AirflowException("Time since last heartbeat({:.2f}s) "
"exceeded limit ({}s)."
.format(time_since_last_heartbeat,
heartbeat_time_limit))
finally:
self.on_kill()
def on_kill(self):
self.task_runner.terminate()
self.task_runner.on_finish()
@provide_session
def heartbeat_callback(self, session=None):
"""Self destruct task if state has been moved away from running externally"""
if self.terminating:
# ensure termination if processes are created later
self.task_runner.terminate()
return
self.task_instance.refresh_from_db()
ti = self.task_instance
fqdn = socket.getfqdn()
same_hostname = fqdn == ti.hostname
same_process = ti.pid == os.getpid()
if ti.state == State.RUNNING:
if not same_hostname:
self.log.warning("The recorded hostname {ti.hostname} "
"does not match this instance's hostname "
"{fqdn}".format(**locals()))
raise AirflowException("Hostname of job runner does not match")
elif not same_process:
current_pid = os.getpid()
self.log.warning("Recorded pid {ti.pid} does not match the current pid "
"{current_pid}".format(**locals()))
raise AirflowException("PID of job runner does not match")
elif (self.task_runner.return_code() is None
and hasattr(self.task_runner, 'process')):
self.log.warning(
"State of this instance has been externally set to %s. Taking the poison pill.",
ti.state
)
self.task_runner.terminate()
self.terminating = True