blob: 854db1a24e4e54e5625e25eabda438403d450846 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.updater;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.JobInstanceUpdateEvent;
import org.apache.aurora.gen.JobUpdateAction;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.sla.SlaManager;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ISlaPolicy;
import org.apache.aurora.scheduler.updater.UpdaterModule.UpdateActionBatchWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES;
import static org.apache.aurora.scheduler.base.Tasks.isKillable;
/**
* This class helps push SLA-aware kills to completion.
*
* SLA-aware kills will be added to a queue and retried until SLA passes or the update changes
* state. If the update becomes PAUSED or AWAITING_PULSE, then the retries will stop until the
* update becomes active again.
*/
class SlaKillController {
private static final Logger LOG = LoggerFactory.getLogger(SlaKillController.class);
@VisibleForTesting
static final String SLA_CHECKING_MESSAGE = "Checking SLA before continuing.";
@VisibleForTesting
static final String SLA_PASSED_MESSAGE = "SLA check passed, continuing.";
@VisibleForTesting
static final String SLA_KILL_ATTEMPT = "updates_sla_kill_attempt_";
@VisibleForTesting
static final String SLA_KILL_SUCCESS = "updates_sla_kill_success_";
private final ScheduledExecutorService executor;
private final UpdateActionBatchWorker batchWorker;
private final SlaManager slaManager;
private final BackoffStrategy backoffStrategy;
private final Clock clock;
private final LoadingCache<String, AtomicLong> killAttemptsByJob;
private final LoadingCache<String, AtomicLong> killSuccessesByJob;
@Inject
SlaKillController(
ScheduledExecutorService executor,
UpdateActionBatchWorker batchWorker,
SlaManager slaManager,
Clock clock,
BackoffStrategy backoffStrategy,
StatsProvider statsProvider) {
this.executor = requireNonNull(executor);
this.batchWorker = requireNonNull(batchWorker);
this.slaManager = requireNonNull(slaManager);
this.backoffStrategy = requireNonNull(backoffStrategy);
this.clock = requireNonNull(clock);
killAttemptsByJob = CacheBuilder.newBuilder().build(
new CacheLoader<String, AtomicLong>() {
@Override
public AtomicLong load(String key) {
return statsProvider.makeCounter(key);
}
}
);
killSuccessesByJob = CacheBuilder.newBuilder().build(
new CacheLoader<String, AtomicLong>() {
@Override
public AtomicLong load(String key) {
return statsProvider.makeCounter(key);
}
}
);
}
/**
* Perform an SLA-aware kill on a given {@link IInstanceKey instance} and
* {@link IScheduledTask task} combination. Adds {@link JobInstanceUpdateEvent}s to show progress
* of the kill (e.g. whether it is waiting for the SLA check to pass or whether it has already
* succeeded and is in the process of killing). SLA-aware kills are performed asynchronously
* to calling this method.
*
* Kills will retry until:
*
* - The kill is successful.
* - The update becomes inactive.
* - The task supplied is killed for some other reason.
*
* @param storeProvider A storage provider to access/modify update information.
* @param instance The {@link IInstanceKey} of the task being killed.
* @param task The task being killed.
* @param key The update key.
* @param slaPolicy The {@link ISlaPolicy} to use.
* @param status The status of the update as of issuing the kill.
* @param killCommand The consumer provided to execute on a successful SLA check. This consumer
* should perform the "kill" of the task (i.e. do the transition to KILLING).
*/
void slaKill(
MutableStoreProvider storeProvider,
IInstanceKey instance,
IScheduledTask task,
IJobUpdateKey key,
ISlaPolicy slaPolicy,
JobUpdateStatus status,
Consumer<MutableStoreProvider> killCommand) {
if (!updateEventExists(
storeProvider,
instance,
key,
determineActionFromStatus(status),
SLA_CHECKING_MESSAGE)) {
// The check above is to ensure we do not add duplicate instance update events since slaKill
// can be called multiple times for one instance (e.g. slaKill is called, the update is
// paused, and then slaKill is called again after an unpause).
addUpdatingInstanceEvent(
storeProvider,
instance,
key,
clock.nowMillis(),
determineActionFromStatus(status),
SLA_CHECKING_MESSAGE);
}
retryingSlaKill(
instance,
task.getAssignedTask().getTaskId(),
key,
slaPolicy,
status,
killCommand,
0L);
}
private void retryingSlaKill(
IInstanceKey instance,
String taskId,
IJobUpdateKey key,
ISlaPolicy slaPolicy,
JobUpdateStatus status,
Consumer<MutableStoreProvider> killCommand,
long retryInMs) {
batchWorker.execute(storeProvider -> {
if (updateInStatus(storeProvider, key, status)) {
// Ensure that the update is still ongoing.
storeProvider
.getTaskStore()
.fetchTask(taskId)
.filter(task -> isKillable(task.getStatus()))
.ifPresent(task -> {
incrementJobStatCounter(killAttemptsByJob, SLA_KILL_ATTEMPT, instance.getJobKey());
slaManager.checkSlaThenAct(
task,
slaPolicy,
slaStoreProvider -> performKill(
slaStoreProvider,
instance,
key,
status,
killCommand),
ImmutableMap.of(),
// If the task is not assigned, force the update since it does not affect the
// SLA. For example, if a task is THROTTLED or PENDING, we probably don't care
// if the update replaces it with a new instance.
!SLAVE_ASSIGNED_STATES.contains(task.getStatus()));
// We retry all SLA kills since it is possible that the SLA check fails and thus we
// will want to reevaluate later. If the kill succeeds or the update is cancelled by
// the next time the kill is retried, it will be a NOOP.
long backoff = backoffStrategy.calculateBackoffMs(retryInMs);
executor.schedule(
() -> retryingSlaKill(
instance,
taskId,
key,
slaPolicy,
status,
killCommand,
backoff),
backoff,
TimeUnit.MILLISECONDS);
});
}
return BatchWorker.NO_RESULT;
});
}
/**
* Passed into {@link SlaManager#checkSlaThenAct}. Actually performs the kill on the instance.
*
* Ensures that the update is still valid (the update's status has not changed since the kill
* was initiated) before persisting new state.
*/
private BatchWorker.NoResult performKill(
MutableStoreProvider slaStoreProvider,
IInstanceKey instance,
IJobUpdateKey key,
JobUpdateStatus status,
Consumer<MutableStoreProvider> killCommand) {
LOG.info("Performing SLA-aware kill of " + instance);
if (updateInStatus(slaStoreProvider, key, status)) {
// Check again that the status is the same as when the command was issued. We do this because
// the SLA kill is executed asynchronously and the status may have changed (due to a pause or
// waiting for a pulse).
addUpdatingInstanceEvent(
slaStoreProvider,
instance,
key,
clock.nowMillis(),
determineActionFromStatus(status),
SLA_PASSED_MESSAGE);
killCommand.accept(slaStoreProvider);
incrementJobStatCounter(killSuccessesByJob, SLA_KILL_SUCCESS, instance.getJobKey());
} else {
// Between issuing the SLA kill and the kill being executed, the update has
// changed state and is no longer ROLLING_FORWARD. Skip the kill until the
// update becomes active again.
LOG.info("Update " + key + " is not active, skipping SLA kill of " + instance);
}
return BatchWorker.NO_RESULT;
}
private static void incrementJobStatCounter(
LoadingCache<String, AtomicLong> counter,
String prefix,
IJobKey jobKey) {
counter.getUnchecked(prefix + JobKeys.canonicalString(jobKey)).incrementAndGet();
}
/**
* Checks that an update contains an instance update event with a given instance ID,
* {@link JobUpdateAction}, and message.
*/
private static boolean updateEventExists(
MutableStoreProvider storeProvider,
IInstanceKey instance,
IJobUpdateKey key,
JobUpdateAction action,
String message) {
return storeProvider
.getJobUpdateStore()
.fetchJobUpdate(key)
.get()
.getInstanceEvents()
.stream()
.anyMatch(e -> e.getInstanceId() == instance.getInstanceId()
&& e.getAction() == action
&& e.isSetMessage()
&& e.getMessage().equals(message));
}
private static void addUpdatingInstanceEvent(
MutableStoreProvider storeProvider,
IInstanceKey instance,
IJobUpdateKey key,
long timestampMs,
JobUpdateAction action,
String message) {
IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(
new JobInstanceUpdateEvent()
.setInstanceId(instance.getInstanceId())
.setTimestampMs(timestampMs)
.setAction(action)
.setMessage(message));
storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(key, event);
}
private static boolean updateInStatus(
MutableStoreProvider storeProvider,
IJobUpdateKey key,
JobUpdateStatus status) {
return storeProvider
.getJobUpdateStore()
.fetchJobUpdate(key)
.filter(jobUpdateDetails -> Updates.getJobUpdateStatus(jobUpdateDetails) == status)
.isPresent();
}
/**
* Determine the {@link JobUpdateAction} that corresponds with a give {@link JobUpdateStatus}.
* For example, ROLLING_FORWARD corresponds with INSTANCE_UPDATING actions while ROLLING_BACK
* corresponds with INSTANCE_ROLLING_BACK actions.
*/
private static JobUpdateAction determineActionFromStatus(JobUpdateStatus status) {
switch (status) {
case ROLLING_BACK:
return JobUpdateAction.INSTANCE_ROLLING_BACK;
case ROLLING_FORWARD:
return JobUpdateAction.INSTANCE_UPDATING;
default:
throw new RuntimeException("Unexpected status " + status + " encountered when"
+ " performing an SLA-aware update.");
}
}
}