blob: 0186af52f0d7d7e3981ec59bf6a01aafee2bcfb1 [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''Library of utilities called by the aurora client binary
'''
from __future__ import print_function
import functools
import math
import re
import sys
from pystachio import Empty
from twitter.common import log
from apache.aurora.client import binding_helper
from apache.aurora.client.base import die
from apache.aurora.config import AuroraConfig
ANNOUNCE_WARNING = """
Announcer specified primary port as '%(primary_port)s' but no processes have bound that port.
If you would like to utilize this port, you should listen on {{thermos.ports[%(primary_port)s]}}
from some Process bound to your task.
"""
def _validate_announce_configuration(config):
if not config.raw().has_announce():
return
primary_port = config.raw().announce().primary_port().get()
if primary_port not in config.ports():
print(ANNOUNCE_WARNING % {'primary_port': primary_port}, file=sys.stderr)
if config.raw().has_announce() and not config.raw().has_constraints() or (
'dedicated' not in config.raw().constraints()):
for port in config.raw().announce().portmap().get().values():
try:
port = int(port)
except ValueError:
continue
raise ValueError('Job must be dedicated in order to specify static ports!')
STAGING_RE = re.compile(r'^staging\d*$')
def __validate_env(name, config_name):
if STAGING_RE.match(name):
return
if name not in ('prod', 'devel', 'test'):
raise ValueError('%s should be one of "prod", "devel", "test" or '
'staging<number>! Got %s' % (config_name, name))
def _validate_environment_name(config):
env_name = str(config.raw().environment())
__validate_env(env_name, 'Environment')
UPDATE_CONFIG_MAX_FAILURES_ERROR = '''
max_total_failures in update_config must be lesser than the job size.
Based on your job size (%s) you should use max_total_failures <= %s.
'''
UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR = '''
Since this is a dedicated job, you must set your max_total_failures in
your update configuration to no less than 2%% of your job size.
Based on your job size (%s) you should use max_total_failures >= %s.
'''
WATCH_SECS_INSUFFICIENT_ERROR_FORMAT = '''
You have specified an insufficiently short watch period (%d seconds) in your update configuration.
Your update will always succeed. In order for the updater to detect health check failures,
UpdateConfig.watch_secs must be greater than %d seconds to account for an initial
health check interval (%d seconds) plus %d consecutive failures at a check interval of %d seconds.
'''
def _validate_update_config(config):
job_size = config.instances()
update_config = config.update_config()
health_check_config = config.health_check_config()
max_failures = update_config.max_total_failures().get()
watch_secs = update_config.watch_secs().get()
initial_interval_secs = health_check_config.initial_interval_secs().get()
max_consecutive_failures = health_check_config.max_consecutive_failures().get()
interval_secs = health_check_config.interval_secs().get()
if max_failures >= job_size:
die(UPDATE_CONFIG_MAX_FAILURES_ERROR % (job_size, job_size - 1))
if config.is_dedicated():
min_failure_threshold = int(math.floor(job_size * 0.02))
if max_failures < min_failure_threshold:
die(UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR % (job_size, min_failure_threshold))
target_watch = initial_interval_secs + (max_consecutive_failures * interval_secs)
if watch_secs <= target_watch:
die(WATCH_SECS_INSUFFICIENT_ERROR_FORMAT %
(watch_secs, target_watch, initial_interval_secs, max_consecutive_failures, interval_secs))
PRODUCTION_DEPRECATED_WARNING = (
'Job configuration attribute \'production\' is deprecated.\n'
'Use \'tier\' attribute instead. For more information please refer to \n'
'http://aurora.apache.org/documentation/latest/reference/configuration/#job-objects')
def deprecation_warning(text):
log.warning('')
log.warning('*' * 80)
log.warning('* The command you ran is deprecated and will soon break!')
for line in text.split('\n'):
log.warning('* %s' % line)
log.warning('*' * 80)
log.warning('')
def _validate_deprecated_config(config):
if config.raw().production().get() and config.raw().tier() is Empty:
deprecation_warning(PRODUCTION_DEPRECATED_WARNING)
def validate_config(config, env=None):
_validate_update_config(config)
_validate_announce_configuration(config)
_validate_environment_name(config)
_validate_deprecated_config(config)
class GlobalHookRegistry(object):
"""To allow enforcable policy, we need a set of mandatory hooks that are
registered as part of the client executable. Global hooks are registered
by calling GlobalHookRegistry.register_global_hook.
"""
HOOKS = []
DISABLED = False
@classmethod
def register_global_hook(cls, hook):
cls.HOOKS.append(hook)
@classmethod
def get_hooks(cls):
if cls.DISABLED:
return []
else:
return cls.HOOKS[:]
@classmethod
def reset(cls):
cls.HOOKS = []
cls.DISABLED = False
@classmethod
def disable_hooks(cls):
cls.DISABLED = True
@classmethod
def enable_hooks(cls):
cls.DISABLED = False
def inject_hooks(config, env=None):
config.hooks = (env or {}).get('hooks', [])
class AnnotatedAuroraConfig(AuroraConfig):
@classmethod
def plugins(cls):
return (inject_hooks,
functools.partial(binding_helper.apply_all),
validate_config)
def get_config(jobname,
config_file,
json=False,
bindings=(),
select_cluster=None,
select_role=None,
select_env=None):
"""Creates and returns a config object contained in the provided file."""
loader = AnnotatedAuroraConfig.load_json if json else AnnotatedAuroraConfig.load
return loader(config_file,
jobname,
bindings,
select_cluster=select_cluster,
select_role=select_role,
select_env=select_env)