blob: 0dfbb15d2caaaa8de501fd30078a7e1418a26e3c [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import logging
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
from airflow import configuration as conf
from airflow.logging_config import configure_logging
log = logging.getLogger(__name__)
class DummyStatsLogger(object):
@classmethod
def incr(cls, stat, count=1, rate=1):
pass
@classmethod
def decr(cls, stat, count=1, rate=1):
pass
@classmethod
def gauge(cls, stat, value, rate=1, delta=False):
pass
@classmethod
def timing(cls, stat, dt):
pass
Stats = DummyStatsLogger
if conf.getboolean('scheduler', 'statsd_on'):
from statsd import StatsClient
statsd = StatsClient(
host=conf.get('scheduler', 'statsd_host'),
port=conf.getint('scheduler', 'statsd_port'),
prefix=conf.get('scheduler', 'statsd_prefix'))
Stats = statsd
else:
Stats = DummyStatsLogger
HEADER = """\
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
"""
BASE_LOG_URL = '/admin/airflow/log'
LOGGING_LEVEL = logging.INFO
# the prefix to append to gunicorn worker processes after init
GUNICORN_WORKER_READY_PREFIX = "[ready] "
LOG_FORMAT = conf.get('core', 'log_format')
SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format')
AIRFLOW_HOME = None
SQL_ALCHEMY_CONN = None
DAGS_FOLDER = None
engine = None
Session = None
def policy(task_instance):
"""
This policy setting allows altering task instances right before they
are executed. It allows administrator to rewire some task parameters.
Note that the ``TaskInstance`` object has an attribute ``task`` pointing
to its related task object, that in turns has a reference to the DAG
object. So you can use the attributes of all of these to define your
policy.
To define policy, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``policy`` function. It receives
a ``TaskInstance`` object and can alter it where needed.
Here are a few examples of how this can be useful:
* You could enforce a specific queue (say the ``spark`` queue)
for tasks using the ``SparkOperator`` to make sure that these
task instances get wired to the right workers
* You could force all task instances running on an
``execution_date`` older than a week old to run in a ``backfill``
pool.
* ...
"""
pass
def configure_vars():
global AIRFLOW_HOME
global SQL_ALCHEMY_CONN
global DAGS_FOLDER
AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME'))
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
def configure_orm(disable_connection_pool=False):
global engine
global Session
engine_args = {}
if disable_connection_pool:
engine_args['poolclass'] = NullPool
elif 'sqlite' not in SQL_ALCHEMY_CONN:
# Engine args not supported by sqlite
engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
engine_args['pool_recycle'] = conf.getint('core',
'SQL_ALCHEMY_POOL_RECYCLE')
engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))
try:
from airflow_local_settings import *
log.info("Loaded airflow_local_settings.")
except:
pass
configure_logging()
configure_vars()
configure_orm()
# Const stuff
KILOBYTE = 1024
MEGABYTE = KILOBYTE * KILOBYTE
WEB_COLORS = {'LIGHTBLUE': '#4d9de0',
'LIGHTORANGE': '#FF9933'}