# Licensed under the Apache License, Version 2.0 (the "License");
import math
import time
from collections import defaultdict, namedtuple
from twitter.common import log
from apache.aurora.client.base import DEFAULT_GROUPING, format_response, group_hosts
from apache.aurora.common.aurora_job_key import AuroraJobKey
from gen.apache.aurora.api.constants import LIVE_STATES
from gen.apache.aurora.api.ttypes import ResponseCode, ScheduleStatus, TaskQuery
def job_key_from_scheduled(task, cluster):
"""Creates AuroraJobKey from the ScheduledTask.
task -- ScheduledTask to get job key from.
cluster -- Cluster the task belongs to.
config = task.assignedTask.task
return AuroraJobKey(,
def task_query(hosts=None, job_keys=None):
"""Creates TaskQuery optionally scoped by a job(s) or hosts.
hosts -- list of hostnames to scope the query by.
job_keys -- list of AuroraJobKeys to scope the query by.
return TaskQuery(
slaveHosts=set(hosts) if hosts else None,
jobKeys=[k.to_thrift() for k in job_keys] if job_keys else None,
class JobUpTimeSlaVector(object):
"""A grouping of job active tasks by:
- instance: Map of instance ID -> instance uptime in seconds.
Exposes an API for converting raw instance uptime data into job SLA metrics.
def __init__(self, tasks, now=None):
self._tasks = tasks
self._now = now or time.time()
self._uptime_map = self._instance_uptime()
def total_tasks(self):
"""Returns the total count of active tasks."""
return len(self._uptime_map)
def get_wait_time_to_sla(self, percentile, duration, total_tasks=None):
"""Returns an approximate wait time until the job reaches the specified SLA
defined by percentile and duration.
percentile -- up count percentile to calculate wait time against.
duration -- uptime duration to calculate wait time against.
total_tasks -- optional total task count to calculate against.
upcount = self.get_task_up_count(duration, total_tasks)
if upcount >= percentile:
return 0
# To get wait time to SLA:
# - Calculate the desired number of up instances in order to satisfy the percentile.
# - Find the desired index (x) in the instance list sorted in non-decreasing order of uptimes.
# If desired index outside of current element count -> return None for "infeasible".
# - Calculate wait time as: duration - duration(x)
elements = len(self._uptime_map)
total = total_tasks or elements
target_count = math.ceil(total * percentile / 100.0)
index = elements - int(target_count)
if index < 0 or index >= elements:
return None
return duration - sorted(self._uptime_map.values())[index]
def get_task_up_count(self, duration, total_tasks=None):
"""Returns the percentage of job tasks that stayed up longer than duration.
duration -- uptime duration in seconds.
total_tasks -- optional total task count to calculate against.
total = total_tasks or len(self._uptime_map)
above = len([uptime for uptime in self._uptime_map.values() if uptime >= duration])
return 100.0 * above / total if total else 0
def get_job_uptime(self, percentile):
"""Returns the uptime (in seconds) of the job at the specified percentile.
percentile -- percentile to report uptime for.
if percentile <= 0 or percentile >= 100:
raise ValueError('Percentile must be within (0, 100), got %r instead.' % percentile)
total = len(self._uptime_map)
value = math.floor(percentile / 100.0 * total)
index = total - int(value) - 1
return sorted(self._uptime_map.values())[index] if 0 <= index < total else 0
def _instance_uptime(self):
instance_map = {}
for task in self._tasks:
for event in task.taskEvents:
if event.status == ScheduleStatus.RUNNING:
instance_map[task.assignedTask.instanceId] = math.floor(
self._now - event.timestamp / 1000)
return instance_map
JobUpTimeLimit = namedtuple('JobUpTimeLimit', ['job', 'percentage', 'duration_secs'])
JobUpTimeDetails = namedtuple('JobUpTimeDetails',
['job', 'predicted_percentage', 'safe', 'safe_in_secs'])
class DomainUpTimeSlaVector(object):
"""A grouping of all active tasks in the cluster by:
- job: Map of job_key -> task. Provides logical mapping between jobs and their active tasks.
- host: Map of hostname -> job_key. Provides logical mapping between hosts and their jobs.
Exposes an API for querying safe domain details.
def __init__(self, cluster, tasks, min_instance_count=DEFAULT_MIN_INSTANCE_COUNT, hosts=None):
self._cluster = cluster
self._tasks = tasks
self._now = time.time()
self._tasks_by_job, self._jobs_by_host, self._hosts_by_job = self._init_mappings(
self._host_filter = hosts
def get_safe_hosts(self,
"""Returns hosts safe to restart with respect to their job SLA.
Every host is analyzed separately without considering other job hosts.
percentage -- default task up count percentage. Used if job_limits mapping is not found.
duration -- default task uptime duration in seconds. Used if job_limits mapping is not found.
job_limits -- optional SLA override map. Key: job key. Value JobUpTimeLimit. If specified,
replaces default percentage/duration within the job context.
grouping_function -- grouping function to use to group hosts.
safe_groups = []
for hosts, job_keys in self._iter_groups(
self._jobs_by_host.keys(), grouping_function, self._host_filter):
safe_hosts = defaultdict(list)
for job_key in job_keys:
job_hosts = hosts.intersection(self._hosts_by_job[job_key])
job_duration = duration
job_percentage = percentage
if job_limits and job_key in job_limits:
job_duration = job_limits[job_key].duration_secs
job_percentage = job_limits[job_key].percentage
filtered_percentage, _, _ = self._simulate_hosts_down(job_key, job_hosts, job_duration)
if filtered_percentage < job_percentage:
for host in job_hosts:
safe_hosts[host].append(JobUpTimeLimit(job_key, filtered_percentage, job_duration))
return safe_groups
def probe_hosts(self, percentage, duration, grouping_function=DEFAULT_GROUPING):
"""Returns predicted job SLAs following the removal of provided hosts.
For every given host creates a list of JobUpTimeDetails with predicted job SLA details
in case the host is restarted, including: host, job_key, predicted up count, whether
the predicted job SLA >= percentage and the expected wait time in seconds for the job
to reach its SLA.
percentage -- task up count percentage.
duration -- task uptime duration in seconds.
grouping_function -- grouping function to use to group hosts.
probed_groups = []
for hosts, job_keys in self._iter_groups(self._host_filter or [], grouping_function):
probed_hosts = defaultdict(list)
for job_key in job_keys:
job_hosts = hosts.intersection(self._hosts_by_job[job_key])
filtered_percentage, total_count, filtered_vector = self._simulate_hosts_down(
job_key, job_hosts, duration)
# Calculate wait time to SLA in case down host violates job's SLA.
if filtered_percentage < percentage:
safe = False
wait_to_sla = filtered_vector.get_wait_time_to_sla(percentage, duration, total_count)
safe = True
wait_to_sla = 0
for host in job_hosts:
JobUpTimeDetails(job_key, filtered_percentage, safe, wait_to_sla))
if probed_hosts:
return probed_groups
def _iter_groups(self, hosts_to_group, grouping_function, host_filter=None):
groups = group_hosts(hosts_to_group, grouping_function)
for _, hosts in sorted(groups.items(), key=lambda v: v[0]):
job_keys = set()
for host in hosts:
if host_filter and host not in self._host_filter:
job_keys = job_keys.union(self._jobs_by_host.get(host, set()))
yield hosts, job_keys
def _create_group_results(self, group, uptime_details):
result = defaultdict(list)
for host in group.keys():
def _simulate_hosts_down(self, job_key, hosts, duration):
unfiltered_tasks = self._tasks_by_job[job_key]
# Get total job task count to use in SLA calculation.
total_count = len(unfiltered_tasks)
# Get a list of job tasks that would remain after the affected hosts go down
# and create an SLA vector with these tasks.
filtered_tasks = [task for task in unfiltered_tasks
if task.assignedTask.slaveHost not in hosts]
filtered_vector = JobUpTimeSlaVector(filtered_tasks, self._now)
# Calculate the SLA that would be in effect should the host go down.
filtered_percentage = filtered_vector.get_task_up_count(duration, total_count)
return filtered_percentage, total_count, filtered_vector
def _init_mappings(self, count):
tasks_by_job = defaultdict(list)
for task in self._tasks:
if task.assignedTask.task.production:
tasks_by_job[job_key_from_scheduled(task, self._cluster)].append(task)
# Filter jobs by the min instance count.
tasks_by_job = defaultdict(list, ((job, tasks) for job, tasks
in tasks_by_job.items() if len(tasks) >= count))
jobs_by_host = defaultdict(set)
hosts_by_job = defaultdict(set)
for job_key, tasks in tasks_by_job.items():
for task in tasks:
host = task.assignedTask.slaveHost
return tasks_by_job, jobs_by_host, hosts_by_job
class Sla(object):
"""Defines methods for generating job uptime metrics required for monitoring job SLA."""
def __init__(self, scheduler):
self._scheduler = scheduler
def get_job_uptime_vector(self, job_key):
"""Returns a JobUpTimeSlaVector object for the given job key.
job_key -- job to create a task uptime vector for.
return JobUpTimeSlaVector(self._get_tasks(task_query(job_keys=[job_key])))
def get_domain_uptime_vector(self, cluster, min_instance_count, hosts=None):
"""Returns a DomainUpTimeSlaVector object with all available job uptimes.
cluster -- Cluster to get vector for.
min_instance_count -- Minimum job instance count to consider for domain uptime calculations.
hosts -- optional list of hostnames to query by.
tasks = self._get_tasks(task_query(hosts=hosts)) if hosts else None
job_keys = set(job_key_from_scheduled(t, cluster) for t in tasks) if tasks else None
# Avoid full cluster pull if job_keys are missing for any reason but the hosts are specified.
job_tasks = [] if hosts and not job_keys else self._get_tasks(task_query(job_keys=job_keys))
return DomainUpTimeSlaVector(
def _get_tasks(self, task_query):
resp = self._scheduler.getTasksWithoutConfigs(task_query)
if resp.responseCode != ResponseCode.OK:
return []
return resp.result.scheduleStatusResult.tasks