blob: 35ca7714b7dc8ee8ea0de4a8e5505546c5b60d85 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.sla;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import javax.inject.Qualifier;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Striped;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.inject.Inject;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.ICoordinatorSlaPolicy;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TSimpleJSONProtocol;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Param;
import org.asynchttpclient.Response;
import org.asynchttpclient.util.HttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
/**
* Provides methods for performing SLA-safe work. It is used for maintenance and job update
* operations to guarantee that a job's SLA requirements are always satisfied.
*/
public class SlaManager extends AbstractIdleService {
private static final Logger LOG = LoggerFactory.getLogger(SlaManager.class);
@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface SlaManagerExecutor { }
@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface MaxParallelCoordinators { }
@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface HttpClient { }
@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface MinRequiredInstances { }
@VisibleForTesting
static final String TASK_PARAM = "task";
private static final String ATTEMPTS_STAT_NAME = "sla_coordinator_attempts";
private static final String SUCCESS_STAT_NAME = "sla_coordinator_success";
private static final String ERRORS_STAT_NAME = "sla_coordinator_errors";
private static final String USER_ERRORS_STAT_NAME = "sla_coordinator_user_errors";
private static final String LOCK_STARVATION_STAT_NAME = "sla_coordinator_lock_starvation";
private final ScheduledExecutorService executor;
private final Storage storage;
private final IServerInfo serverInfo;
private final AsyncHttpClient httpClient;
private final Striped<Lock> lock;
private final int minRequiredInstances;
private final TierManager tierManager;
private final AtomicLong attemptsCounter;
private final AtomicLong successCounter;
private final AtomicLong errorsCounter;
private final AtomicLong userErrorsCounter;
private final AtomicLong lockStarvationCounter;
private final LoadingCache<String, AtomicLong> errorsByTaskCounter;
private final LoadingCache<String, AtomicLong> userErrorsByTaskCounter;
private final LoadingCache<String, AtomicLong> lockStarvationByTaskCounter;
@Inject
SlaManager(@SlaManagerExecutor ScheduledExecutorService executor,
@MaxParallelCoordinators Integer maxCoordinatorLocks,
@MinRequiredInstances Integer minRequiredInstances,
Storage storage,
IServerInfo serverInfo,
@HttpClient AsyncHttpClient httpClient,
TierManager tierManager,
StatsProvider statsProvider) {
this.executor = requireNonNull(executor);
this.storage = requireNonNull(storage);
this.serverInfo = requireNonNull(serverInfo);
this.httpClient = requireNonNull(httpClient);
this.tierManager = requireNonNull(tierManager);
this.minRequiredInstances = requireNonNull(minRequiredInstances);
this.attemptsCounter = statsProvider.makeCounter(ATTEMPTS_STAT_NAME);
this.successCounter = statsProvider.makeCounter(SUCCESS_STAT_NAME);
this.errorsCounter = statsProvider.makeCounter(ERRORS_STAT_NAME);
this.userErrorsCounter = statsProvider.makeCounter(USER_ERRORS_STAT_NAME);
this.lockStarvationCounter = statsProvider.makeCounter(LOCK_STARVATION_STAT_NAME);
this.lock = Striped.lazyWeakLock(requireNonNull(maxCoordinatorLocks));
this.errorsByTaskCounter = CacheBuilder.newBuilder().build(
new CacheLoader<String, AtomicLong>() {
@Override
public AtomicLong load(String key) {
return statsProvider.makeCounter(key);
}
}
);
this.userErrorsByTaskCounter = CacheBuilder.newBuilder().build(
new CacheLoader<String, AtomicLong>() {
@Override
public AtomicLong load(String key) {
return statsProvider.makeCounter(key);
}
}
);
this.lockStarvationByTaskCounter = CacheBuilder.newBuilder().build(
new CacheLoader<String, AtomicLong>() {
@Override
public AtomicLong load(String key) {
return statsProvider.makeCounter(key);
}
}
);
}
private long getSlaDuration(ISlaPolicy slaPolicy) {
if (slaPolicy.isSetPercentageSlaPolicy()) {
return slaPolicy.getPercentageSlaPolicy().getDurationSecs();
} else if (slaPolicy.isSetCountSlaPolicy()) {
return slaPolicy.getCountSlaPolicy().getDurationSecs();
}
throw new IllegalArgumentException("Expected a percentage/count sla policy.");
}
private boolean meetsSLAInstances(ISlaPolicy slaPolicy, long running, long total) {
if (slaPolicy.isSetPercentageSlaPolicy()) {
double percentageRunning = ((double) running / total) * 100.0;
double percentageRequired = slaPolicy.getPercentageSlaPolicy().getPercentage();
long numMaintenance = (long) ((1 - percentageRequired / 100.0) * total);
if (numMaintenance < 1) {
double updated = ((total - 1) / (double) total) * 100;
LOG.warn("Invalid PercentageSlaPolicy(percentage={}) for task with {} instances "
+ "that allows {} instances to be in maintenance."
+ "Using percentage={} which allows 1 instance to be under maintenance.",
percentageRequired, total, numMaintenance, updated);
percentageRequired = updated;
}
return percentageRunning >= percentageRequired;
} else if (slaPolicy.isSetCountSlaPolicy()) {
long numRequired = slaPolicy.getCountSlaPolicy().getCount();
long numMaintenance = total - numRequired;
if (numMaintenance < 1) {
long updated = total - 1;
LOG.warn("Invalid CountSlaPolicy(count={}) for task with {} instances "
+ "that allows {} instances to be in maintenance."
+ "Using count={} which allows 1 instance to be under maintenance.",
numRequired, total, numMaintenance, updated);
numRequired = updated;
}
return running >= numRequired;
}
throw new IllegalArgumentException("Expected a percentage/count sla policy.");
}
private boolean meetsSLADuration(IScheduledTask task, Amount<Long, Time> slaDuration) {
return task.getTaskEvents()
.stream()
.filter(te -> te.getStatus() == ScheduleStatus.RUNNING)
.map(ITaskEvent::getTimestamp)
.max(Long::compare)
.map(t -> System.currentTimeMillis() - t > slaDuration.as(Time.MILLISECONDS))
.orElse(false);
}
private boolean checkSla(IScheduledTask task, ISlaPolicy slaPolicy, StoreProvider store) {
// Find the number of active tasks for the job, it will be used as the set of tasks
// against which the percentage/count based SLA will be calculated against.
final long numActive = store.getTaskStore().fetchTasks(
Query.jobScoped(task.getAssignedTask().getTask().getJob()).active()
).size();
if (skipSla(task, numActive)) {
LOG.info("Skip SLA for {} because it is not production or does not have enough instances.",
Tasks.id(task));
return true;
}
// Find tasks which have been RUNNING for the required SLA duration.
Amount<Long, Time> slaDuration = new TimeAmount(getSlaDuration(slaPolicy), Time.SECONDS);
final Set<IScheduledTask> running = store.getTaskStore().fetchTasks(
Query.jobScoped(task.getAssignedTask().getTask().getJob())
.byStatus(ScheduleStatus.RUNNING))
.stream()
.filter(t -> !Tasks.id(t).equals(Tasks.id(task))) // exclude the task to be removed
.filter(t -> meetsSLADuration(t, slaDuration)) // task is running for sla duration
.collect(Collectors.toSet());
// Check it we satisfy the number of RUNNING tasks per duration time.
boolean meetsSla = meetsSLAInstances(slaPolicy, running.size(), numActive);
LOG.info("SlaCheck: {}, {} tasks unaffected after updating state for {}.",
meetsSla,
running.size(),
Tasks.id(task));
return meetsSla;
}
/**
* Performs the supplied {@link Storage.MutateWork} after checking with the configured
* coordinator endpoint to make sure it is safe to perform the work.
*
* NOTE: Both the SLA check and the {@link Storage.MutateWork} will be performed within a
* {@link Lock} that is indexed by the coordinator url. We do this to make sure that mutations
* to the SLA are performed atomically, so the Coordinator does not have to track
* concurrent requests and simulate SLA changes.
*
* @param task Task whose SLA is to checked.
* @param slaPolicy {@link ICoordinatorSlaPolicy} to use for checking SLA.
* @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
* @param params dictionary of key-value pairs to send in json body to Coordinator
* @param <T> The type of result the {@link Storage.MutateWork} produces.
* @param <E> The type of exception the {@link Storage.MutateWork} throw.
*/
private <T, E extends Exception> void askCoordinatorThenAct(
IScheduledTask task,
ICoordinatorSlaPolicy slaPolicy,
Storage.MutateWork<T, E> work,
Map<String, String> params) {
String taskKey = getTaskKey(task);
LOG.debug("Awaiting lock on coordinator: {} for task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey);
Lock l = lock.get(slaPolicy.getCoordinatorUrl());
if (l.tryLock()) {
try {
LOG.info("Acquired lock on coordinator: {} for task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey);
attemptsCounter.incrementAndGet();
if (coordinatorAllows(task, taskKey, slaPolicy, params)) {
LOG.info("Performing work after coordinator: {} approval for task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey);
storage.write(work);
}
} catch (RuntimeException e) {
LOG.error("Unexpected failure during coordinator sla check against: {} for task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey,
e);
throw e;
} catch (Exception e) {
incrementErrorCount(ERRORS_STAT_NAME, taskKey);
LOG.error("Failed to talk to coordinator: {} for task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey,
e);
} finally {
LOG.info("Releasing lock for coordinator: {} and task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey);
l.unlock();
}
} else {
incrementErrorCount(LOCK_STARVATION_STAT_NAME, taskKey);
LOG.info("Failed to acquire lock on coordinator: {} for task: {}",
slaPolicy.getCoordinatorUrl(),
taskKey);
}
}
private boolean coordinatorAllows(
IScheduledTask task,
String taskKey,
ICoordinatorSlaPolicy slaPolicy,
Map<String, String> params)
throws InterruptedException, ExecutionException, TException {
LOG.info("Checking coordinator: {} for task: {}", slaPolicy.getCoordinatorUrl(), taskKey);
String taskConfig = new TSerializer(new TSimpleJSONProtocol.Factory())
.toString(task.newBuilder());
JsonObject jsonBody = new JsonObject();
jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
jsonBody.addProperty(TASK_PARAM, taskKey);
params.forEach(jsonBody::addProperty);
Response response = httpClient.preparePost(slaPolicy.getCoordinatorUrl())
.setQueryParams(ImmutableList.of(new Param(TASK_PARAM, taskKey)))
.setBody(new Gson().toJson(jsonBody))
.execute()
.get();
if (response.getStatusCode() != HttpConstants.ResponseStatusCodes.OK_200) {
LOG.error("Request failed to coordinator: {} for task: {}. Response: {}",
slaPolicy.getCoordinatorUrl(),
taskKey,
response.getStatusCode());
incrementErrorCount(USER_ERRORS_STAT_NAME, taskKey);
return false;
}
successCounter.incrementAndGet();
String json = response.getResponseBody();
LOG.info("Got response: {} from {} for task: {}",
json,
slaPolicy.getCoordinatorUrl(),
taskKey);
Map<String, Boolean> result = new Gson().fromJson(
json,
new TypeReference<Map<String, Boolean>>() { }.getType());
return result.get(slaPolicy.isSetStatusKey() ? slaPolicy.getStatusKey() : "drain");
}
@VisibleForTesting
String getTaskKey(IScheduledTask task) {
IJobKey jobKey = Tasks.getJob(task);
int instanceId = Tasks.getInstanceId(task);
return String.format("%s/%s/%s/%s/%s", serverInfo.getClusterName(),
jobKey.getRole(), jobKey.getEnvironment(), jobKey.getName(), instanceId);
}
private void incrementErrorCount(String prefix, String taskKey) {
try {
if (prefix.equals(ERRORS_STAT_NAME)) {
errorsCounter.incrementAndGet();
errorsByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet();
}
if (prefix.equals(USER_ERRORS_STAT_NAME)) {
userErrorsCounter.incrementAndGet();
userErrorsByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet();
}
if (prefix.equals(LOCK_STARVATION_STAT_NAME)) {
lockStarvationCounter.incrementAndGet();
lockStarvationByTaskCounter.get(prefix + "_" + taskKey).incrementAndGet();
}
} catch (ExecutionException e) {
LOG.error("Failed increment failure metrics for task: {}", taskKey, e);
}
}
/**
* Checks the SLA for the given task using the {@link ISlaPolicy} supplied.
*
* @param task Task whose SLA is to be checked.
* @param slaPolicy {@link ISlaPolicy} to use.
* @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
* @param params dictionary of key-value pairs to send in json body to Coordinator
* @param force boolean to indicate if work should be performed without checking SLA.
* @param <T> The type of result the {@link Storage.MutateWork} produces.
* @param <E> The type of exception the {@link Storage.MutateWork} throw.
* @throws E raises exception of type E if the {@link Storage.MutateWork} throws.
*/
@Timed
public <T, E extends Exception> void checkSlaThenAct(
IScheduledTask task,
ISlaPolicy slaPolicy,
Storage.MutateWork<T, E> work,
Map<String, String> params,
boolean force) throws E {
if (force) {
LOG.info("Forcing work without applying SlaPolicy: {} for {}", slaPolicy, Tasks.id(task));
storage.write(work);
return;
}
LOG.info("Using SlaPolicy: {} for {}", slaPolicy, Tasks.id(task));
// has custom coordinator sla policy
if (slaPolicy.isSetCoordinatorSlaPolicy()) {
// schedule work to perform coordinated transition
executor.execute(() -> askCoordinatorThenAct(
task,
slaPolicy.getCoordinatorSlaPolicy(),
work,
params));
} else {
// verify sla and perform work if satisfied
storage.write(store -> {
if (checkSla(task, slaPolicy, store)) {
work.apply(store);
}
return null; // TODO(sshanmugham) we need to satisfy the interface, refactor later
});
}
}
private boolean skipSla(IScheduledTask task, long numActive) {
if (!tierManager.getTier(task.getAssignedTask().getTask()).isPreemptible()
&& !tierManager.getTier(task.getAssignedTask().getTask()).isRevocable()) {
return numActive < minRequiredInstances;
}
return true;
}
@Override
protected void startUp() {
//no-op
}
@Override
protected void shutDown() throws Exception {
LOG.info("Shutting down SlaManager async http client.");
httpClient.close();
}
}